iceberg/spec/manifest/
writer.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::cmp::min;
19
20use apache_avro::{Writer as AvroWriter, to_value};
21use bytes::Bytes;
22use itertools::Itertools;
23use serde_json::to_vec;
24
25use super::{
26    Datum, FormatVersion, ManifestContentType, PartitionSpec, PrimitiveType,
27    UNASSIGNED_SEQUENCE_NUMBER,
28};
29use crate::error::Result;
30use crate::io::OutputFile;
31use crate::spec::manifest::_serde::{ManifestEntryV1, ManifestEntryV2};
32use crate::spec::manifest::{manifest_schema_v1, manifest_schema_v2};
33use crate::spec::{
34    DataContentType, DataFile, FieldSummary, ManifestEntry, ManifestFile, ManifestMetadata,
35    ManifestStatus, PrimitiveLiteral, SchemaRef, StructType,
36};
37use crate::{Error, ErrorKind};
38
39/// Placeholder for snapshot ID. The field with this value must be replaced
40/// with the actual snapshot ID before it is committed.
41const UNASSIGNED_SNAPSHOT_ID: i64 = -1;
42
43/// The builder used to create a [`ManifestWriter`].
44pub struct ManifestWriterBuilder {
45    output: OutputFile,
46    snapshot_id: Option<i64>,
47    key_metadata: Option<Vec<u8>>,
48    schema: SchemaRef,
49    partition_spec: PartitionSpec,
50}
51
52impl ManifestWriterBuilder {
53    /// Create a new builder.
54    pub fn new(
55        output: OutputFile,
56        snapshot_id: Option<i64>,
57        key_metadata: Option<Vec<u8>>,
58        schema: SchemaRef,
59        partition_spec: PartitionSpec,
60    ) -> Self {
61        Self {
62            output,
63            snapshot_id,
64            key_metadata,
65            schema,
66            partition_spec,
67        }
68    }
69
70    /// Build a [`ManifestWriter`] for format version 1.
71    pub fn build_v1(self) -> ManifestWriter {
72        let metadata = ManifestMetadata::builder()
73            .schema_id(self.schema.schema_id())
74            .schema(self.schema)
75            .partition_spec(self.partition_spec)
76            .format_version(FormatVersion::V1)
77            .content(ManifestContentType::Data)
78            .build();
79        ManifestWriter::new(
80            self.output,
81            self.snapshot_id,
82            self.key_metadata,
83            metadata,
84            None,
85        )
86    }
87
88    /// Build a [`ManifestWriter`] for format version 2, data content.
89    pub fn build_v2_data(self) -> ManifestWriter {
90        let metadata = ManifestMetadata::builder()
91            .schema_id(self.schema.schema_id())
92            .schema(self.schema)
93            .partition_spec(self.partition_spec)
94            .format_version(FormatVersion::V2)
95            .content(ManifestContentType::Data)
96            .build();
97        ManifestWriter::new(
98            self.output,
99            self.snapshot_id,
100            self.key_metadata,
101            metadata,
102            None,
103        )
104    }
105
106    /// Build a [`ManifestWriter`] for format version 2, deletes content.
107    pub fn build_v2_deletes(self) -> ManifestWriter {
108        let metadata = ManifestMetadata::builder()
109            .schema_id(self.schema.schema_id())
110            .schema(self.schema)
111            .partition_spec(self.partition_spec)
112            .format_version(FormatVersion::V2)
113            .content(ManifestContentType::Deletes)
114            .build();
115        ManifestWriter::new(
116            self.output,
117            self.snapshot_id,
118            self.key_metadata,
119            metadata,
120            None,
121        )
122    }
123
124    /// Build a [`ManifestWriter`] for format version 2, data content.
125    pub fn build_v3_data(self) -> ManifestWriter {
126        let metadata = ManifestMetadata::builder()
127            .schema_id(self.schema.schema_id())
128            .schema(self.schema)
129            .partition_spec(self.partition_spec)
130            .format_version(FormatVersion::V3)
131            .content(ManifestContentType::Data)
132            .build();
133        ManifestWriter::new(
134            self.output,
135            self.snapshot_id,
136            self.key_metadata,
137            metadata,
138            // First row id is assigned by the [`ManifestListWriter`] when the manifest
139            // is added to the list.
140            None,
141        )
142    }
143
144    /// Build a [`ManifestWriter`] for format version 3, deletes content.
145    pub fn build_v3_deletes(self) -> ManifestWriter {
146        let metadata = ManifestMetadata::builder()
147            .schema_id(self.schema.schema_id())
148            .schema(self.schema)
149            .partition_spec(self.partition_spec)
150            .format_version(FormatVersion::V3)
151            .content(ManifestContentType::Deletes)
152            .build();
153        ManifestWriter::new(
154            self.output,
155            self.snapshot_id,
156            self.key_metadata,
157            metadata,
158            None,
159        )
160    }
161}
162
163/// A manifest writer.
164pub struct ManifestWriter {
165    output: OutputFile,
166
167    snapshot_id: Option<i64>,
168
169    added_files: u32,
170    added_rows: u64,
171    existing_files: u32,
172    existing_rows: u64,
173    deleted_files: u32,
174    deleted_rows: u64,
175    first_row_id: Option<u64>,
176
177    min_seq_num: Option<i64>,
178
179    key_metadata: Option<Vec<u8>>,
180
181    manifest_entries: Vec<ManifestEntry>,
182
183    metadata: ManifestMetadata,
184}
185
186impl ManifestWriter {
187    /// Create a new manifest writer.
188    pub(crate) fn new(
189        output: OutputFile,
190        snapshot_id: Option<i64>,
191        key_metadata: Option<Vec<u8>>,
192        metadata: ManifestMetadata,
193        first_row_id: Option<u64>,
194    ) -> Self {
195        Self {
196            output,
197            snapshot_id,
198            added_files: 0,
199            added_rows: 0,
200            existing_files: 0,
201            existing_rows: 0,
202            deleted_files: 0,
203            deleted_rows: 0,
204            first_row_id,
205            min_seq_num: None,
206            key_metadata,
207            manifest_entries: Vec::new(),
208            metadata,
209        }
210    }
211
212    fn construct_partition_summaries(
213        &mut self,
214        partition_type: &StructType,
215    ) -> Result<Vec<FieldSummary>> {
216        let mut field_stats: Vec<_> = partition_type
217            .fields()
218            .iter()
219            .map(|f| PartitionFieldStats::new(f.field_type.as_primitive_type().unwrap().clone()))
220            .collect();
221        for partition in self.manifest_entries.iter().map(|e| &e.data_file.partition) {
222            for (literal, stat) in partition.iter().zip_eq(field_stats.iter_mut()) {
223                let primitive_literal = literal.map(|v| v.as_primitive_literal().unwrap());
224                stat.update(primitive_literal)?;
225            }
226        }
227        Ok(field_stats.into_iter().map(|stat| stat.finish()).collect())
228    }
229
230    fn check_data_file(&self, data_file: &DataFile) -> Result<()> {
231        match self.metadata.content {
232            ManifestContentType::Data => {
233                if data_file.content != DataContentType::Data {
234                    return Err(Error::new(
235                        ErrorKind::DataInvalid,
236                        format!(
237                            "Date file at path {} with manifest content type `data`, should have DataContentType `Data`, but has `{:?}`",
238                            data_file.file_path(),
239                            data_file.content
240                        ),
241                    ));
242                }
243            }
244            ManifestContentType::Deletes => {
245                if data_file.content != DataContentType::EqualityDeletes
246                    && data_file.content != DataContentType::PositionDeletes
247                {
248                    return Err(Error::new(
249                        ErrorKind::DataInvalid,
250                        format!(
251                            "Date file at path {} with manifest content type `deletes`, should have DataContentType `Data`, but has `{:?}`",
252                            data_file.file_path(),
253                            data_file.content
254                        ),
255                    ));
256                }
257            }
258        }
259        Ok(())
260    }
261
262    /// Add a new manifest entry. This method will update following status of the entry:
263    /// - Update the entry status to `Added`
264    /// - Set the snapshot id to the current snapshot id
265    /// - Set the sequence number to `None` if it is invalid(smaller than 0)
266    /// - Set the file sequence number to `None`
267    pub(crate) fn add_entry(&mut self, mut entry: ManifestEntry) -> Result<()> {
268        self.check_data_file(&entry.data_file)?;
269        if entry.sequence_number().is_some_and(|n| n >= 0) {
270            entry.status = ManifestStatus::Added;
271            entry.snapshot_id = self.snapshot_id;
272            entry.file_sequence_number = None;
273        } else {
274            entry.status = ManifestStatus::Added;
275            entry.snapshot_id = self.snapshot_id;
276            entry.sequence_number = None;
277            entry.file_sequence_number = None;
278        };
279        self.add_entry_inner(entry)?;
280        Ok(())
281    }
282
283    /// Add file as an added entry with a specific sequence number. The entry's snapshot ID will be this manifest's snapshot ID. The entry's data sequence
284    /// number will be the provided data sequence number. The entry's file sequence number will be
285    /// assigned at commit.
286    pub fn add_file(&mut self, data_file: DataFile, sequence_number: i64) -> Result<()> {
287        self.check_data_file(&data_file)?;
288        let entry = ManifestEntry {
289            status: ManifestStatus::Added,
290            snapshot_id: self.snapshot_id,
291            sequence_number: (sequence_number >= 0).then_some(sequence_number),
292            file_sequence_number: None,
293            data_file,
294        };
295        self.add_entry_inner(entry)?;
296        Ok(())
297    }
298
299    /// Add a delete manifest entry. This method will update following status of the entry:
300    /// - Update the entry status to `Deleted`
301    /// - Set the snapshot id to the current snapshot id
302    ///
303    /// # TODO
304    /// Remove this allow later
305    #[allow(dead_code)]
306    pub(crate) fn add_delete_entry(&mut self, mut entry: ManifestEntry) -> Result<()> {
307        self.check_data_file(&entry.data_file)?;
308        entry.status = ManifestStatus::Deleted;
309        entry.snapshot_id = self.snapshot_id;
310        self.add_entry_inner(entry)?;
311        Ok(())
312    }
313
314    /// Add a file as delete manifest entry. The entry's snapshot ID will be this manifest's snapshot ID.
315    /// However, the original data and file sequence numbers of the file must be preserved when
316    /// the file is marked as deleted.
317    pub fn add_delete_file(
318        &mut self,
319        data_file: DataFile,
320        sequence_number: i64,
321        file_sequence_number: Option<i64>,
322    ) -> Result<()> {
323        self.check_data_file(&data_file)?;
324        let entry = ManifestEntry {
325            status: ManifestStatus::Deleted,
326            snapshot_id: self.snapshot_id,
327            sequence_number: Some(sequence_number),
328            file_sequence_number,
329            data_file,
330        };
331        self.add_entry_inner(entry)?;
332        Ok(())
333    }
334
335    /// Add an existing manifest entry. This method will update following status of the entry:
336    /// - Update the entry status to `Existing`
337    ///
338    /// # TODO
339    /// Remove this allow later
340    #[allow(dead_code)]
341    pub(crate) fn add_existing_entry(&mut self, mut entry: ManifestEntry) -> Result<()> {
342        self.check_data_file(&entry.data_file)?;
343        entry.status = ManifestStatus::Existing;
344        self.add_entry_inner(entry)?;
345        Ok(())
346    }
347
348    /// Add an file as existing manifest entry. The original data and file sequence numbers, snapshot ID,
349    /// which were assigned at commit, must be preserved when adding an existing entry.
350    pub fn add_existing_file(
351        &mut self,
352        data_file: DataFile,
353        snapshot_id: i64,
354        sequence_number: i64,
355        file_sequence_number: Option<i64>,
356    ) -> Result<()> {
357        self.check_data_file(&data_file)?;
358        let entry = ManifestEntry {
359            status: ManifestStatus::Existing,
360            snapshot_id: Some(snapshot_id),
361            sequence_number: Some(sequence_number),
362            file_sequence_number,
363            data_file,
364        };
365        self.add_entry_inner(entry)?;
366        Ok(())
367    }
368
369    fn add_entry_inner(&mut self, entry: ManifestEntry) -> Result<()> {
370        // Check if the entry has sequence number
371        if (entry.status == ManifestStatus::Deleted || entry.status == ManifestStatus::Existing)
372            && (entry.sequence_number.is_none() || entry.file_sequence_number.is_none())
373        {
374            return Err(Error::new(
375                ErrorKind::DataInvalid,
376                "Manifest entry with status Existing or Deleted should have sequence number",
377            ));
378        }
379
380        // Update the statistics
381        match entry.status {
382            ManifestStatus::Added => {
383                self.added_files += 1;
384                self.added_rows += entry.data_file.record_count;
385            }
386            ManifestStatus::Deleted => {
387                self.deleted_files += 1;
388                self.deleted_rows += entry.data_file.record_count;
389            }
390            ManifestStatus::Existing => {
391                self.existing_files += 1;
392                self.existing_rows += entry.data_file.record_count;
393            }
394        }
395        if entry.is_alive()
396            && let Some(seq_num) = entry.sequence_number
397        {
398            self.min_seq_num = Some(self.min_seq_num.map_or(seq_num, |v| min(v, seq_num)));
399        }
400        self.manifest_entries.push(entry);
401        Ok(())
402    }
403
404    /// Write manifest file and return it.
405    pub async fn write_manifest_file(mut self) -> Result<ManifestFile> {
406        // Create the avro writer
407        let partition_type = self
408            .metadata
409            .partition_spec
410            .partition_type(&self.metadata.schema)?;
411        let table_schema = &self.metadata.schema;
412        let avro_schema = match self.metadata.format_version {
413            FormatVersion::V1 => manifest_schema_v1(&partition_type)?,
414            // Manifest schema did not change between V2 and V3
415            FormatVersion::V2 | FormatVersion::V3 => manifest_schema_v2(&partition_type)?,
416        };
417        let mut avro_writer = AvroWriter::new(&avro_schema, Vec::new());
418        avro_writer.add_user_metadata(
419            "schema".to_string(),
420            to_vec(table_schema).map_err(|err| {
421                Error::new(ErrorKind::DataInvalid, "Fail to serialize table schema")
422                    .with_source(err)
423            })?,
424        )?;
425        avro_writer.add_user_metadata(
426            "schema-id".to_string(),
427            table_schema.schema_id().to_string(),
428        )?;
429        avro_writer.add_user_metadata(
430            "partition-spec".to_string(),
431            to_vec(&self.metadata.partition_spec.fields()).map_err(|err| {
432                Error::new(ErrorKind::DataInvalid, "Fail to serialize partition spec")
433                    .with_source(err)
434            })?,
435        )?;
436        avro_writer.add_user_metadata(
437            "partition-spec-id".to_string(),
438            self.metadata.partition_spec.spec_id().to_string(),
439        )?;
440        avro_writer.add_user_metadata(
441            "format-version".to_string(),
442            (self.metadata.format_version as u8).to_string(),
443        )?;
444        match self.metadata.format_version {
445            FormatVersion::V1 => {}
446            FormatVersion::V2 | FormatVersion::V3 => {
447                avro_writer
448                    .add_user_metadata("content".to_string(), self.metadata.content.to_string())?;
449            }
450        }
451
452        let partition_summary = self.construct_partition_summaries(&partition_type)?;
453        // Write manifest entries
454        for entry in std::mem::take(&mut self.manifest_entries) {
455            let value = match self.metadata.format_version {
456                FormatVersion::V1 => to_value(ManifestEntryV1::try_from(entry, &partition_type)?)?
457                    .resolve(&avro_schema)?,
458                // Manifest entry format did not change between V2 and V3
459                FormatVersion::V2 | FormatVersion::V3 => {
460                    to_value(ManifestEntryV2::try_from(entry, &partition_type)?)?
461                        .resolve(&avro_schema)?
462                }
463            };
464
465            avro_writer.append(value)?;
466        }
467
468        let content = avro_writer.into_inner()?;
469        let length = content.len();
470        self.output.write(Bytes::from(content)).await?;
471
472        Ok(ManifestFile {
473            manifest_path: self.output.location().to_string(),
474            manifest_length: length as i64,
475            partition_spec_id: self.metadata.partition_spec.spec_id(),
476            content: self.metadata.content,
477            // sequence_number and min_sequence_number with UNASSIGNED_SEQUENCE_NUMBER will be replace with
478            // real sequence number in `ManifestListWriter`.
479            sequence_number: UNASSIGNED_SEQUENCE_NUMBER,
480            min_sequence_number: self.min_seq_num.unwrap_or(UNASSIGNED_SEQUENCE_NUMBER),
481            added_snapshot_id: self.snapshot_id.unwrap_or(UNASSIGNED_SNAPSHOT_ID),
482            added_files_count: Some(self.added_files),
483            existing_files_count: Some(self.existing_files),
484            deleted_files_count: Some(self.deleted_files),
485            added_rows_count: Some(self.added_rows),
486            existing_rows_count: Some(self.existing_rows),
487            deleted_rows_count: Some(self.deleted_rows),
488            partitions: Some(partition_summary),
489            key_metadata: self.key_metadata,
490            first_row_id: self.first_row_id,
491        })
492    }
493}
494
495struct PartitionFieldStats {
496    partition_type: PrimitiveType,
497
498    contains_null: bool,
499    contains_nan: Option<bool>,
500    lower_bound: Option<Datum>,
501    upper_bound: Option<Datum>,
502}
503
504impl PartitionFieldStats {
505    pub(crate) fn new(partition_type: PrimitiveType) -> Self {
506        Self {
507            partition_type,
508            contains_null: false,
509            contains_nan: Some(false),
510            upper_bound: None,
511            lower_bound: None,
512        }
513    }
514
515    pub(crate) fn update(&mut self, value: Option<PrimitiveLiteral>) -> Result<()> {
516        let Some(value) = value else {
517            self.contains_null = true;
518            return Ok(());
519        };
520        if !self.partition_type.compatible(&value) {
521            return Err(Error::new(
522                ErrorKind::DataInvalid,
523                "value is not compatible with type",
524            ));
525        }
526        let value = Datum::new(self.partition_type.clone(), value);
527
528        if value.is_nan() {
529            self.contains_nan = Some(true);
530            return Ok(());
531        }
532
533        self.lower_bound = Some(self.lower_bound.take().map_or(value.clone(), |original| {
534            if value < original {
535                value.clone()
536            } else {
537                original
538            }
539        }));
540        self.upper_bound = Some(self.upper_bound.take().map_or(value.clone(), |original| {
541            if value > original { value } else { original }
542        }));
543
544        Ok(())
545    }
546
547    pub(crate) fn finish(self) -> FieldSummary {
548        FieldSummary {
549            contains_null: self.contains_null,
550            contains_nan: self.contains_nan,
551            upper_bound: self.upper_bound.map(|v| v.to_bytes().unwrap()),
552            lower_bound: self.lower_bound.map(|v| v.to_bytes().unwrap()),
553        }
554    }
555}
556
557#[cfg(test)]
558mod tests {
559    use std::collections::HashMap;
560    use std::fs;
561    use std::sync::Arc;
562
563    use tempfile::TempDir;
564
565    use super::*;
566    use crate::io::FileIO;
567    use crate::spec::{DataFileFormat, Manifest, NestedField, PrimitiveType, Schema, Struct, Type};
568
569    #[tokio::test]
570    async fn test_add_delete_existing() {
571        let schema = Arc::new(
572            Schema::builder()
573                .with_fields(vec![
574                    Arc::new(NestedField::optional(
575                        1,
576                        "id",
577                        Type::Primitive(PrimitiveType::Int),
578                    )),
579                    Arc::new(NestedField::optional(
580                        2,
581                        "name",
582                        Type::Primitive(PrimitiveType::String),
583                    )),
584                ])
585                .build()
586                .unwrap(),
587        );
588        let metadata = ManifestMetadata {
589            schema_id: 0,
590            schema: schema.clone(),
591            partition_spec: PartitionSpec::builder(schema)
592                .with_spec_id(0)
593                .build()
594                .unwrap(),
595            content: ManifestContentType::Data,
596            format_version: FormatVersion::V2,
597        };
598        let mut entries = vec![
599                ManifestEntry {
600                    status: ManifestStatus::Added,
601                    snapshot_id: None,
602                    sequence_number: Some(1),
603                    file_sequence_number: Some(1),
604                    data_file: DataFile {
605                        content: DataContentType::Data,
606                        file_path: "s3a://icebergdata/demo/s1/t1/data/00000-0-ba56fbfa-f2ff-40c9-bb27-565ad6dc2be8-00000.parquet".to_string(),
607                        file_format: DataFileFormat::Parquet,
608                        partition: Struct::empty(),
609                        record_count: 1,
610                        file_size_in_bytes: 5442,
611                        column_sizes: HashMap::from([(1, 61), (2, 73)]),
612                        value_counts: HashMap::from([(1, 1), (2, 1)]),
613                        null_value_counts: HashMap::from([(1, 0), (2, 0)]),
614                        nan_value_counts: HashMap::new(),
615                        lower_bounds: HashMap::new(),
616                        upper_bounds: HashMap::new(),
617                        key_metadata: Some(Vec::new()),
618                        split_offsets: Some(vec![4]),
619                        equality_ids: None,
620                        sort_order_id: None,
621                        partition_spec_id: 0,
622                        first_row_id: None,
623                        referenced_data_file: None,
624                        content_offset: None,
625                        content_size_in_bytes: None,
626                    },
627                },
628                ManifestEntry {
629                    status: ManifestStatus::Deleted,
630                    snapshot_id: Some(1),
631                    sequence_number: Some(1),
632                    file_sequence_number: Some(1),
633                    data_file: DataFile {
634                        content: DataContentType::Data,
635                        file_path: "s3a://icebergdata/demo/s1/t1/data/00000-0-ba56fbfa-f2ff-40c9-bb27-565ad6dc2be8-00000.parquet".to_string(),
636                        file_format: DataFileFormat::Parquet,
637                        partition: Struct::empty(),
638                        record_count: 1,
639                        file_size_in_bytes: 5442,
640                        column_sizes: HashMap::from([(1, 61), (2, 73)]),
641                        value_counts: HashMap::from([(1, 1), (2, 1)]),
642                        null_value_counts: HashMap::from([(1, 0), (2, 0)]),
643                        nan_value_counts: HashMap::new(),
644                        lower_bounds: HashMap::new(),
645                        upper_bounds: HashMap::new(),
646                        key_metadata: Some(Vec::new()),
647                        split_offsets: Some(vec![4]),
648                        equality_ids: None,
649                        sort_order_id: None,
650                        partition_spec_id: 0,
651                        first_row_id: None,
652                        referenced_data_file: None,
653                        content_offset: None,
654                        content_size_in_bytes: None,
655                    },
656                },
657                ManifestEntry {
658                    status: ManifestStatus::Existing,
659                    snapshot_id: Some(1),
660                    sequence_number: Some(1),
661                    file_sequence_number: Some(1),
662                    data_file: DataFile {
663                        content: DataContentType::Data,
664                        file_path: "s3a://icebergdata/demo/s1/t1/data/00000-0-ba56fbfa-f2ff-40c9-bb27-565ad6dc2be8-00000.parquet".to_string(),
665                        file_format: DataFileFormat::Parquet,
666                        partition: Struct::empty(),
667                        record_count: 1,
668                        file_size_in_bytes: 5442,
669                        column_sizes: HashMap::from([(1, 61), (2, 73)]),
670                        value_counts: HashMap::from([(1, 1), (2, 1)]),
671                        null_value_counts: HashMap::from([(1, 0), (2, 0)]),
672                        nan_value_counts: HashMap::new(),
673                        lower_bounds: HashMap::new(),
674                        upper_bounds: HashMap::new(),
675                        key_metadata: Some(Vec::new()),
676                        split_offsets: Some(vec![4]),
677                        equality_ids: None,
678                        sort_order_id: None,
679                        partition_spec_id: 0,
680                        first_row_id: None,
681                        referenced_data_file: None,
682                        content_offset: None,
683                        content_size_in_bytes: None,
684                    },
685                },
686            ];
687
688        // write manifest to file
689        let tmp_dir = TempDir::new().unwrap();
690        let path = tmp_dir.path().join("test_manifest.avro");
691        let io = FileIO::new_with_fs();
692        let output_file = io.new_output(path.to_str().unwrap()).unwrap();
693        let mut writer = ManifestWriterBuilder::new(
694            output_file,
695            Some(3),
696            None,
697            metadata.schema.clone(),
698            metadata.partition_spec.clone(),
699        )
700        .build_v2_data();
701        writer.add_entry(entries[0].clone()).unwrap();
702        writer.add_delete_entry(entries[1].clone()).unwrap();
703        writer.add_existing_entry(entries[2].clone()).unwrap();
704        writer.write_manifest_file().await.unwrap();
705
706        // read back the manifest file and check the content
707        let actual_manifest =
708            Manifest::parse_avro(fs::read(path).expect("read_file must succeed").as_slice())
709                .unwrap();
710
711        // The snapshot id is assigned when the entry is added and delete to the manifest. Existing entries are keep original.
712        entries[0].snapshot_id = Some(3);
713        entries[1].snapshot_id = Some(3);
714        // file sequence number is assigned to None when the entry is added and delete to the manifest.
715        entries[0].file_sequence_number = None;
716        assert_eq!(actual_manifest, Manifest::new(metadata, entries));
717    }
718
719    #[tokio::test]
720    async fn test_v3_delete_manifest_delete_file_roundtrip() {
721        let schema = Arc::new(
722            Schema::builder()
723                .with_fields(vec![
724                    Arc::new(NestedField::optional(
725                        1,
726                        "id",
727                        Type::Primitive(PrimitiveType::Long),
728                    )),
729                    Arc::new(NestedField::optional(
730                        2,
731                        "data",
732                        Type::Primitive(PrimitiveType::String),
733                    )),
734                ])
735                .build()
736                .unwrap(),
737        );
738
739        let partition_spec = PartitionSpec::builder(schema.clone())
740            .with_spec_id(0)
741            .build()
742            .unwrap();
743
744        // Create a position delete file entry
745        let delete_entry = ManifestEntry {
746            status: ManifestStatus::Added,
747            snapshot_id: None,
748            sequence_number: None,
749            file_sequence_number: None,
750            data_file: DataFile {
751                content: DataContentType::PositionDeletes,
752                file_path: "s3://bucket/table/data/delete-00000.parquet".to_string(),
753                file_format: DataFileFormat::Parquet,
754                partition: Struct::empty(),
755                record_count: 10,
756                file_size_in_bytes: 1024,
757                column_sizes: HashMap::new(),
758                value_counts: HashMap::new(),
759                null_value_counts: HashMap::new(),
760                nan_value_counts: HashMap::new(),
761                lower_bounds: HashMap::new(),
762                upper_bounds: HashMap::new(),
763                key_metadata: None,
764                split_offsets: None,
765                equality_ids: None,
766                sort_order_id: None,
767                partition_spec_id: 0,
768                first_row_id: None,
769                referenced_data_file: None,
770                content_offset: None,
771                content_size_in_bytes: None,
772            },
773        };
774
775        // Write a V3 delete manifest
776        let tmp_dir = TempDir::new().unwrap();
777        let path = tmp_dir.path().join("v3_delete_manifest.avro");
778        let io = FileIO::new_with_fs();
779        let output_file = io.new_output(path.to_str().unwrap()).unwrap();
780
781        let mut writer = ManifestWriterBuilder::new(
782            output_file,
783            Some(1),
784            None,
785            schema.clone(),
786            partition_spec.clone(),
787        )
788        .build_v3_deletes();
789
790        writer.add_entry(delete_entry).unwrap();
791        let manifest_file = writer.write_manifest_file().await.unwrap();
792
793        // The returned ManifestFile correctly reports Deletes content
794        assert_eq!(manifest_file.content, ManifestContentType::Deletes);
795
796        // Read back the manifest file
797        let actual_manifest =
798            Manifest::parse_avro(fs::read(&path).expect("read_file must succeed").as_slice())
799                .unwrap();
800
801        // Verify the content type is correctly preserved as Deletes
802        assert_eq!(
803            actual_manifest.metadata().content,
804            ManifestContentType::Deletes,
805        );
806    }
807}