iceberg/spec/manifest/
writer.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::cmp::min;
19
20use apache_avro::{Writer as AvroWriter, to_value};
21use bytes::Bytes;
22use itertools::Itertools;
23use serde_json::to_vec;
24
25use super::{
26    Datum, FormatVersion, ManifestContentType, PartitionSpec, PrimitiveType,
27    UNASSIGNED_SEQUENCE_NUMBER,
28};
29use crate::error::Result;
30use crate::io::OutputFile;
31use crate::spec::manifest::_serde::{ManifestEntryV1, ManifestEntryV2};
32use crate::spec::manifest::{manifest_schema_v1, manifest_schema_v2};
33use crate::spec::{
34    DataContentType, DataFile, FieldSummary, ManifestEntry, ManifestFile, ManifestMetadata,
35    ManifestStatus, PrimitiveLiteral, SchemaRef, StructType, UNASSIGNED_SNAPSHOT_ID,
36};
37use crate::{Error, ErrorKind};
38
39/// The builder used to create a [`ManifestWriter`].
40pub struct ManifestWriterBuilder {
41    output: OutputFile,
42    snapshot_id: Option<i64>,
43    key_metadata: Option<Vec<u8>>,
44    schema: SchemaRef,
45    partition_spec: PartitionSpec,
46}
47
48impl ManifestWriterBuilder {
49    /// Create a new builder.
50    pub fn new(
51        output: OutputFile,
52        snapshot_id: Option<i64>,
53        key_metadata: Option<Vec<u8>>,
54        schema: SchemaRef,
55        partition_spec: PartitionSpec,
56    ) -> Self {
57        Self {
58            output,
59            snapshot_id,
60            key_metadata,
61            schema,
62            partition_spec,
63        }
64    }
65
66    /// Build a [`ManifestWriter`] for format version 1.
67    pub fn build_v1(self) -> ManifestWriter {
68        let metadata = ManifestMetadata::builder()
69            .schema_id(self.schema.schema_id())
70            .schema(self.schema)
71            .partition_spec(self.partition_spec)
72            .format_version(FormatVersion::V1)
73            .content(ManifestContentType::Data)
74            .build();
75        ManifestWriter::new(
76            self.output,
77            self.snapshot_id,
78            self.key_metadata,
79            metadata,
80            None,
81        )
82    }
83
84    /// Build a [`ManifestWriter`] for format version 2, data content.
85    pub fn build_v2_data(self) -> ManifestWriter {
86        let metadata = ManifestMetadata::builder()
87            .schema_id(self.schema.schema_id())
88            .schema(self.schema)
89            .partition_spec(self.partition_spec)
90            .format_version(FormatVersion::V2)
91            .content(ManifestContentType::Data)
92            .build();
93        ManifestWriter::new(
94            self.output,
95            self.snapshot_id,
96            self.key_metadata,
97            metadata,
98            None,
99        )
100    }
101
102    /// Build a [`ManifestWriter`] for format version 2, deletes content.
103    pub fn build_v2_deletes(self) -> ManifestWriter {
104        let metadata = ManifestMetadata::builder()
105            .schema_id(self.schema.schema_id())
106            .schema(self.schema)
107            .partition_spec(self.partition_spec)
108            .format_version(FormatVersion::V2)
109            .content(ManifestContentType::Deletes)
110            .build();
111        ManifestWriter::new(
112            self.output,
113            self.snapshot_id,
114            self.key_metadata,
115            metadata,
116            None,
117        )
118    }
119
120    /// Build a [`ManifestWriter`] for format version 2, data content.
121    pub fn build_v3_data(self) -> ManifestWriter {
122        let metadata = ManifestMetadata::builder()
123            .schema_id(self.schema.schema_id())
124            .schema(self.schema)
125            .partition_spec(self.partition_spec)
126            .format_version(FormatVersion::V3)
127            .content(ManifestContentType::Data)
128            .build();
129        ManifestWriter::new(
130            self.output,
131            self.snapshot_id,
132            self.key_metadata,
133            metadata,
134            // First row id is assigned by the [`ManifestListWriter`] when the manifest
135            // is added to the list.
136            None,
137        )
138    }
139
140    /// Build a [`ManifestWriter`] for format version 3, deletes content.
141    pub fn build_v3_deletes(self) -> ManifestWriter {
142        let metadata = ManifestMetadata::builder()
143            .schema_id(self.schema.schema_id())
144            .schema(self.schema)
145            .partition_spec(self.partition_spec)
146            .format_version(FormatVersion::V3)
147            .content(ManifestContentType::Deletes)
148            .build();
149        ManifestWriter::new(
150            self.output,
151            self.snapshot_id,
152            self.key_metadata,
153            metadata,
154            None,
155        )
156    }
157}
158
159/// A manifest writer.
160pub struct ManifestWriter {
161    output: OutputFile,
162
163    snapshot_id: Option<i64>,
164
165    added_files: u32,
166    added_rows: u64,
167    existing_files: u32,
168    existing_rows: u64,
169    deleted_files: u32,
170    deleted_rows: u64,
171    first_row_id: Option<u64>,
172
173    min_seq_num: Option<i64>,
174
175    key_metadata: Option<Vec<u8>>,
176
177    manifest_entries: Vec<ManifestEntry>,
178
179    metadata: ManifestMetadata,
180}
181
182impl ManifestWriter {
183    /// Create a new manifest writer.
184    pub(crate) fn new(
185        output: OutputFile,
186        snapshot_id: Option<i64>,
187        key_metadata: Option<Vec<u8>>,
188        metadata: ManifestMetadata,
189        first_row_id: Option<u64>,
190    ) -> Self {
191        Self {
192            output,
193            snapshot_id,
194            added_files: 0,
195            added_rows: 0,
196            existing_files: 0,
197            existing_rows: 0,
198            deleted_files: 0,
199            deleted_rows: 0,
200            first_row_id,
201            min_seq_num: None,
202            key_metadata,
203            manifest_entries: Vec::new(),
204            metadata,
205        }
206    }
207
208    fn construct_partition_summaries(
209        &mut self,
210        partition_type: &StructType,
211    ) -> Result<Vec<FieldSummary>> {
212        let mut field_stats: Vec<_> = partition_type
213            .fields()
214            .iter()
215            .map(|f| PartitionFieldStats::new(f.field_type.as_primitive_type().unwrap().clone()))
216            .collect();
217        for partition in self.manifest_entries.iter().map(|e| &e.data_file.partition) {
218            for (literal, stat) in partition.iter().zip_eq(field_stats.iter_mut()) {
219                let primitive_literal = literal.map(|v| v.as_primitive_literal().unwrap());
220                stat.update(primitive_literal)?;
221            }
222        }
223        Ok(field_stats.into_iter().map(|stat| stat.finish()).collect())
224    }
225
226    fn check_data_file(&self, data_file: &DataFile) -> Result<()> {
227        match self.metadata.content {
228            ManifestContentType::Data => {
229                if data_file.content != DataContentType::Data {
230                    return Err(Error::new(
231                        ErrorKind::DataInvalid,
232                        format!(
233                            "Date file at path {} with manifest content type `data`, should have DataContentType `Data`, but has `{:?}`",
234                            data_file.file_path(),
235                            data_file.content
236                        ),
237                    ));
238                }
239            }
240            ManifestContentType::Deletes => {
241                if data_file.content != DataContentType::EqualityDeletes
242                    && data_file.content != DataContentType::PositionDeletes
243                {
244                    return Err(Error::new(
245                        ErrorKind::DataInvalid,
246                        format!(
247                            "Date file at path {} with manifest content type `deletes`, should have DataContentType `Data`, but has `{:?}`",
248                            data_file.file_path(),
249                            data_file.content
250                        ),
251                    ));
252                }
253            }
254        }
255        Ok(())
256    }
257
258    /// Add a new manifest entry. This method will update following status of the entry:
259    /// - Update the entry status to `Added`
260    /// - Set the snapshot id to the current snapshot id
261    /// - Set the sequence number to `None` if it is invalid(smaller than 0)
262    /// - Set the file sequence number to `None`
263    pub(crate) fn add_entry(&mut self, mut entry: ManifestEntry) -> Result<()> {
264        self.check_data_file(&entry.data_file)?;
265        if entry.sequence_number().is_some_and(|n| n >= 0) {
266            entry.status = ManifestStatus::Added;
267            entry.snapshot_id = self.snapshot_id;
268            entry.file_sequence_number = None;
269        } else {
270            entry.status = ManifestStatus::Added;
271            entry.snapshot_id = self.snapshot_id;
272            entry.sequence_number = None;
273            entry.file_sequence_number = None;
274        };
275        self.add_entry_inner(entry)?;
276        Ok(())
277    }
278
279    /// Add file as an added entry with a specific sequence number. The entry's snapshot ID will be this manifest's snapshot ID. The entry's data sequence
280    /// number will be the provided data sequence number. The entry's file sequence number will be
281    /// assigned at commit.
282    pub fn add_file(&mut self, data_file: DataFile, sequence_number: i64) -> Result<()> {
283        self.check_data_file(&data_file)?;
284        let entry = ManifestEntry {
285            status: ManifestStatus::Added,
286            snapshot_id: self.snapshot_id,
287            sequence_number: (sequence_number >= 0).then_some(sequence_number),
288            file_sequence_number: None,
289            data_file,
290        };
291        self.add_entry_inner(entry)?;
292        Ok(())
293    }
294
295    /// Add a delete manifest entry. This method will update following status of the entry:
296    /// - Update the entry status to `Deleted`
297    /// - Set the snapshot id to the current snapshot id
298    ///
299    /// # TODO
300    /// Remove this allow later
301    #[allow(dead_code)]
302    pub(crate) fn add_delete_entry(&mut self, mut entry: ManifestEntry) -> Result<()> {
303        self.check_data_file(&entry.data_file)?;
304        entry.status = ManifestStatus::Deleted;
305        entry.snapshot_id = self.snapshot_id;
306        self.add_entry_inner(entry)?;
307        Ok(())
308    }
309
310    /// Add a file as delete manifest entry. The entry's snapshot ID will be this manifest's snapshot ID.
311    /// However, the original data and file sequence numbers of the file must be preserved when
312    /// the file is marked as deleted.
313    pub fn add_delete_file(
314        &mut self,
315        data_file: DataFile,
316        sequence_number: i64,
317        file_sequence_number: Option<i64>,
318    ) -> Result<()> {
319        self.check_data_file(&data_file)?;
320        let entry = ManifestEntry {
321            status: ManifestStatus::Deleted,
322            snapshot_id: self.snapshot_id,
323            sequence_number: Some(sequence_number),
324            file_sequence_number,
325            data_file,
326        };
327        self.add_entry_inner(entry)?;
328        Ok(())
329    }
330
331    /// Add an existing manifest entry. This method will update following status of the entry:
332    /// - Update the entry status to `Existing`
333    ///
334    /// # TODO
335    /// Remove this allow later
336    #[allow(dead_code)]
337    pub(crate) fn add_existing_entry(&mut self, mut entry: ManifestEntry) -> Result<()> {
338        self.check_data_file(&entry.data_file)?;
339        entry.status = ManifestStatus::Existing;
340        self.add_entry_inner(entry)?;
341        Ok(())
342    }
343
344    /// Add an file as existing manifest entry. The original data and file sequence numbers, snapshot ID,
345    /// which were assigned at commit, must be preserved when adding an existing entry.
346    pub fn add_existing_file(
347        &mut self,
348        data_file: DataFile,
349        snapshot_id: i64,
350        sequence_number: i64,
351        file_sequence_number: Option<i64>,
352    ) -> Result<()> {
353        self.check_data_file(&data_file)?;
354        let entry = ManifestEntry {
355            status: ManifestStatus::Existing,
356            snapshot_id: Some(snapshot_id),
357            sequence_number: Some(sequence_number),
358            file_sequence_number,
359            data_file,
360        };
361        self.add_entry_inner(entry)?;
362        Ok(())
363    }
364
365    fn add_entry_inner(&mut self, entry: ManifestEntry) -> Result<()> {
366        // Check if the entry has sequence number
367        if (entry.status == ManifestStatus::Deleted || entry.status == ManifestStatus::Existing)
368            && (entry.sequence_number.is_none() || entry.file_sequence_number.is_none())
369        {
370            return Err(Error::new(
371                ErrorKind::DataInvalid,
372                "Manifest entry with status Existing or Deleted should have sequence number",
373            ));
374        }
375
376        // Update the statistics
377        match entry.status {
378            ManifestStatus::Added => {
379                self.added_files += 1;
380                self.added_rows += entry.data_file.record_count;
381            }
382            ManifestStatus::Deleted => {
383                self.deleted_files += 1;
384                self.deleted_rows += entry.data_file.record_count;
385            }
386            ManifestStatus::Existing => {
387                self.existing_files += 1;
388                self.existing_rows += entry.data_file.record_count;
389            }
390        }
391        if entry.is_alive()
392            && let Some(seq_num) = entry.sequence_number
393        {
394            self.min_seq_num = Some(self.min_seq_num.map_or(seq_num, |v| min(v, seq_num)));
395        }
396        self.manifest_entries.push(entry);
397        Ok(())
398    }
399
400    /// Write manifest file and return it.
401    pub async fn write_manifest_file(mut self) -> Result<ManifestFile> {
402        // Create the avro writer
403        let partition_type = self
404            .metadata
405            .partition_spec
406            .partition_type(&self.metadata.schema)?;
407        let table_schema = &self.metadata.schema;
408        let avro_schema = match self.metadata.format_version {
409            FormatVersion::V1 => manifest_schema_v1(&partition_type)?,
410            // Manifest schema did not change between V2 and V3
411            FormatVersion::V2 | FormatVersion::V3 => manifest_schema_v2(&partition_type)?,
412        };
413        let mut avro_writer = AvroWriter::new(&avro_schema, Vec::new());
414        avro_writer.add_user_metadata(
415            "schema".to_string(),
416            to_vec(table_schema).map_err(|err| {
417                Error::new(ErrorKind::DataInvalid, "Fail to serialize table schema")
418                    .with_source(err)
419            })?,
420        )?;
421        avro_writer.add_user_metadata(
422            "schema-id".to_string(),
423            table_schema.schema_id().to_string(),
424        )?;
425        avro_writer.add_user_metadata(
426            "partition-spec".to_string(),
427            to_vec(&self.metadata.partition_spec.fields()).map_err(|err| {
428                Error::new(ErrorKind::DataInvalid, "Fail to serialize partition spec")
429                    .with_source(err)
430            })?,
431        )?;
432        avro_writer.add_user_metadata(
433            "partition-spec-id".to_string(),
434            self.metadata.partition_spec.spec_id().to_string(),
435        )?;
436        avro_writer.add_user_metadata(
437            "format-version".to_string(),
438            (self.metadata.format_version as u8).to_string(),
439        )?;
440        match self.metadata.format_version {
441            FormatVersion::V1 => {}
442            FormatVersion::V2 | FormatVersion::V3 => {
443                avro_writer
444                    .add_user_metadata("content".to_string(), self.metadata.content.to_string())?;
445            }
446        }
447
448        let partition_summary = self.construct_partition_summaries(&partition_type)?;
449        // Write manifest entries
450        for entry in std::mem::take(&mut self.manifest_entries) {
451            let value = match self.metadata.format_version {
452                FormatVersion::V1 => to_value(ManifestEntryV1::try_from(entry, &partition_type)?)?
453                    .resolve(&avro_schema)?,
454                // Manifest entry format did not change between V2 and V3
455                FormatVersion::V2 | FormatVersion::V3 => {
456                    to_value(ManifestEntryV2::try_from(entry, &partition_type)?)?
457                        .resolve(&avro_schema)?
458                }
459            };
460
461            avro_writer.append(value)?;
462        }
463
464        let content = avro_writer.into_inner()?;
465        let length = content.len();
466        self.output.write(Bytes::from(content)).await?;
467
468        Ok(ManifestFile {
469            manifest_path: self.output.location().to_string(),
470            manifest_length: length as i64,
471            partition_spec_id: self.metadata.partition_spec.spec_id(),
472            content: self.metadata.content,
473            // sequence_number and min_sequence_number with UNASSIGNED_SEQUENCE_NUMBER will be replace with
474            // real sequence number in `ManifestListWriter`.
475            sequence_number: UNASSIGNED_SEQUENCE_NUMBER,
476            min_sequence_number: self.min_seq_num.unwrap_or(UNASSIGNED_SEQUENCE_NUMBER),
477            added_snapshot_id: self.snapshot_id.unwrap_or(UNASSIGNED_SNAPSHOT_ID),
478            added_files_count: Some(self.added_files),
479            existing_files_count: Some(self.existing_files),
480            deleted_files_count: Some(self.deleted_files),
481            added_rows_count: Some(self.added_rows),
482            existing_rows_count: Some(self.existing_rows),
483            deleted_rows_count: Some(self.deleted_rows),
484            partitions: Some(partition_summary),
485            key_metadata: self.key_metadata,
486            first_row_id: self.first_row_id,
487        })
488    }
489}
490
491struct PartitionFieldStats {
492    partition_type: PrimitiveType,
493
494    contains_null: bool,
495    contains_nan: Option<bool>,
496    lower_bound: Option<Datum>,
497    upper_bound: Option<Datum>,
498}
499
500impl PartitionFieldStats {
501    pub(crate) fn new(partition_type: PrimitiveType) -> Self {
502        Self {
503            partition_type,
504            contains_null: false,
505            contains_nan: Some(false),
506            upper_bound: None,
507            lower_bound: None,
508        }
509    }
510
511    pub(crate) fn update(&mut self, value: Option<PrimitiveLiteral>) -> Result<()> {
512        let Some(value) = value else {
513            self.contains_null = true;
514            return Ok(());
515        };
516        if !self.partition_type.compatible(&value) {
517            return Err(Error::new(
518                ErrorKind::DataInvalid,
519                "value is not compatible with type",
520            ));
521        }
522        let value = Datum::new(self.partition_type.clone(), value);
523
524        if value.is_nan() {
525            self.contains_nan = Some(true);
526            return Ok(());
527        }
528
529        self.lower_bound = Some(self.lower_bound.take().map_or(value.clone(), |original| {
530            if value < original {
531                value.clone()
532            } else {
533                original
534            }
535        }));
536        self.upper_bound = Some(self.upper_bound.take().map_or(value.clone(), |original| {
537            if value > original { value } else { original }
538        }));
539
540        Ok(())
541    }
542
543    pub(crate) fn finish(self) -> FieldSummary {
544        FieldSummary {
545            contains_null: self.contains_null,
546            contains_nan: self.contains_nan,
547            upper_bound: self.upper_bound.map(|v| v.to_bytes().unwrap()),
548            lower_bound: self.lower_bound.map(|v| v.to_bytes().unwrap()),
549        }
550    }
551}
552
553#[cfg(test)]
554mod tests {
555    use std::collections::HashMap;
556    use std::fs;
557    use std::sync::Arc;
558
559    use tempfile::TempDir;
560
561    use super::*;
562    use crate::io::FileIOBuilder;
563    use crate::spec::{DataFileFormat, Manifest, NestedField, PrimitiveType, Schema, Struct, Type};
564
565    #[tokio::test]
566    async fn test_add_delete_existing() {
567        let schema = Arc::new(
568            Schema::builder()
569                .with_fields(vec![
570                    Arc::new(NestedField::optional(
571                        1,
572                        "id",
573                        Type::Primitive(PrimitiveType::Int),
574                    )),
575                    Arc::new(NestedField::optional(
576                        2,
577                        "name",
578                        Type::Primitive(PrimitiveType::String),
579                    )),
580                ])
581                .build()
582                .unwrap(),
583        );
584        let metadata = ManifestMetadata {
585            schema_id: 0,
586            schema: schema.clone(),
587            partition_spec: PartitionSpec::builder(schema)
588                .with_spec_id(0)
589                .build()
590                .unwrap(),
591            content: ManifestContentType::Data,
592            format_version: FormatVersion::V2,
593        };
594        let mut entries = vec![
595                ManifestEntry {
596                    status: ManifestStatus::Added,
597                    snapshot_id: None,
598                    sequence_number: Some(1),
599                    file_sequence_number: Some(1),
600                    data_file: DataFile {
601                        content: DataContentType::Data,
602                        file_path: "s3a://icebergdata/demo/s1/t1/data/00000-0-ba56fbfa-f2ff-40c9-bb27-565ad6dc2be8-00000.parquet".to_string(),
603                        file_format: DataFileFormat::Parquet,
604                        partition: Struct::empty(),
605                        record_count: 1,
606                        file_size_in_bytes: 5442,
607                        column_sizes: HashMap::from([(1, 61), (2, 73)]),
608                        value_counts: HashMap::from([(1, 1), (2, 1)]),
609                        null_value_counts: HashMap::from([(1, 0), (2, 0)]),
610                        nan_value_counts: HashMap::new(),
611                        lower_bounds: HashMap::new(),
612                        upper_bounds: HashMap::new(),
613                        key_metadata: Some(Vec::new()),
614                        split_offsets: Some(vec![4]),
615                        equality_ids: None,
616                        sort_order_id: None,
617                        partition_spec_id: 0,
618                        first_row_id: None,
619                        referenced_data_file: None,
620                        content_offset: None,
621                        content_size_in_bytes: None,
622                    },
623                },
624                ManifestEntry {
625                    status: ManifestStatus::Deleted,
626                    snapshot_id: Some(1),
627                    sequence_number: Some(1),
628                    file_sequence_number: Some(1),
629                    data_file: DataFile {
630                        content: DataContentType::Data,
631                        file_path: "s3a://icebergdata/demo/s1/t1/data/00000-0-ba56fbfa-f2ff-40c9-bb27-565ad6dc2be8-00000.parquet".to_string(),
632                        file_format: DataFileFormat::Parquet,
633                        partition: Struct::empty(),
634                        record_count: 1,
635                        file_size_in_bytes: 5442,
636                        column_sizes: HashMap::from([(1, 61), (2, 73)]),
637                        value_counts: HashMap::from([(1, 1), (2, 1)]),
638                        null_value_counts: HashMap::from([(1, 0), (2, 0)]),
639                        nan_value_counts: HashMap::new(),
640                        lower_bounds: HashMap::new(),
641                        upper_bounds: HashMap::new(),
642                        key_metadata: Some(Vec::new()),
643                        split_offsets: Some(vec![4]),
644                        equality_ids: None,
645                        sort_order_id: None,
646                        partition_spec_id: 0,
647                        first_row_id: None,
648                        referenced_data_file: None,
649                        content_offset: None,
650                        content_size_in_bytes: None,
651                    },
652                },
653                ManifestEntry {
654                    status: ManifestStatus::Existing,
655                    snapshot_id: Some(1),
656                    sequence_number: Some(1),
657                    file_sequence_number: Some(1),
658                    data_file: DataFile {
659                        content: DataContentType::Data,
660                        file_path: "s3a://icebergdata/demo/s1/t1/data/00000-0-ba56fbfa-f2ff-40c9-bb27-565ad6dc2be8-00000.parquet".to_string(),
661                        file_format: DataFileFormat::Parquet,
662                        partition: Struct::empty(),
663                        record_count: 1,
664                        file_size_in_bytes: 5442,
665                        column_sizes: HashMap::from([(1, 61), (2, 73)]),
666                        value_counts: HashMap::from([(1, 1), (2, 1)]),
667                        null_value_counts: HashMap::from([(1, 0), (2, 0)]),
668                        nan_value_counts: HashMap::new(),
669                        lower_bounds: HashMap::new(),
670                        upper_bounds: HashMap::new(),
671                        key_metadata: Some(Vec::new()),
672                        split_offsets: Some(vec![4]),
673                        equality_ids: None,
674                        sort_order_id: None,
675                        partition_spec_id: 0,
676                        first_row_id: None,
677                        referenced_data_file: None,
678                        content_offset: None,
679                        content_size_in_bytes: None,
680                    },
681                },
682            ];
683
684        // write manifest to file
685        let tmp_dir = TempDir::new().unwrap();
686        let path = tmp_dir.path().join("test_manifest.avro");
687        let io = FileIOBuilder::new_fs_io().build().unwrap();
688        let output_file = io.new_output(path.to_str().unwrap()).unwrap();
689        let mut writer = ManifestWriterBuilder::new(
690            output_file,
691            Some(3),
692            None,
693            metadata.schema.clone(),
694            metadata.partition_spec.clone(),
695        )
696        .build_v2_data();
697        writer.add_entry(entries[0].clone()).unwrap();
698        writer.add_delete_entry(entries[1].clone()).unwrap();
699        writer.add_existing_entry(entries[2].clone()).unwrap();
700        writer.write_manifest_file().await.unwrap();
701
702        // read back the manifest file and check the content
703        let actual_manifest =
704            Manifest::parse_avro(fs::read(path).expect("read_file must succeed").as_slice())
705                .unwrap();
706
707        // The snapshot id is assigned when the entry is added and delete to the manifest. Existing entries are keep original.
708        entries[0].snapshot_id = Some(3);
709        entries[1].snapshot_id = Some(3);
710        // file sequence number is assigned to None when the entry is added and delete to the manifest.
711        entries[0].file_sequence_number = None;
712        assert_eq!(actual_manifest, Manifest::new(metadata, entries));
713    }
714
715    #[tokio::test]
716    async fn test_v3_delete_manifest_delete_file_roundtrip() {
717        let schema = Arc::new(
718            Schema::builder()
719                .with_fields(vec![
720                    Arc::new(NestedField::optional(
721                        1,
722                        "id",
723                        Type::Primitive(PrimitiveType::Long),
724                    )),
725                    Arc::new(NestedField::optional(
726                        2,
727                        "data",
728                        Type::Primitive(PrimitiveType::String),
729                    )),
730                ])
731                .build()
732                .unwrap(),
733        );
734
735        let partition_spec = PartitionSpec::builder(schema.clone())
736            .with_spec_id(0)
737            .build()
738            .unwrap();
739
740        // Create a position delete file entry
741        let delete_entry = ManifestEntry {
742            status: ManifestStatus::Added,
743            snapshot_id: None,
744            sequence_number: None,
745            file_sequence_number: None,
746            data_file: DataFile {
747                content: DataContentType::PositionDeletes,
748                file_path: "s3://bucket/table/data/delete-00000.parquet".to_string(),
749                file_format: DataFileFormat::Parquet,
750                partition: Struct::empty(),
751                record_count: 10,
752                file_size_in_bytes: 1024,
753                column_sizes: HashMap::new(),
754                value_counts: HashMap::new(),
755                null_value_counts: HashMap::new(),
756                nan_value_counts: HashMap::new(),
757                lower_bounds: HashMap::new(),
758                upper_bounds: HashMap::new(),
759                key_metadata: None,
760                split_offsets: None,
761                equality_ids: None,
762                sort_order_id: None,
763                partition_spec_id: 0,
764                first_row_id: None,
765                referenced_data_file: None,
766                content_offset: None,
767                content_size_in_bytes: None,
768            },
769        };
770
771        // Write a V3 delete manifest
772        let tmp_dir = TempDir::new().unwrap();
773        let path = tmp_dir.path().join("v3_delete_manifest.avro");
774        let io = FileIOBuilder::new_fs_io().build().unwrap();
775        let output_file = io.new_output(path.to_str().unwrap()).unwrap();
776
777        let mut writer = ManifestWriterBuilder::new(
778            output_file,
779            Some(1),
780            None,
781            schema.clone(),
782            partition_spec.clone(),
783        )
784        .build_v3_deletes();
785
786        writer.add_entry(delete_entry).unwrap();
787        let manifest_file = writer.write_manifest_file().await.unwrap();
788
789        // The returned ManifestFile correctly reports Deletes content
790        assert_eq!(manifest_file.content, ManifestContentType::Deletes);
791
792        // Read back the manifest file
793        let actual_manifest =
794            Manifest::parse_avro(fs::read(&path).expect("read_file must succeed").as_slice())
795                .unwrap();
796
797        // Verify the content type is correctly preserved as Deletes
798        assert_eq!(
799            actual_manifest.metadata().content,
800            ManifestContentType::Deletes,
801        );
802    }
803}