iceberg/spec/manifest/
writer.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::cmp::min;
19use std::future::Future;
20use std::pin::Pin;
21
22use apache_avro::{Writer as AvroWriter, to_value};
23use bytes::Bytes;
24use itertools::Itertools;
25use serde_json::to_vec;
26
27use super::{
28    Datum, FormatVersion, ManifestContentType, PartitionSpec, PrimitiveType,
29    UNASSIGNED_SEQUENCE_NUMBER,
30};
31use crate::encryption::EncryptedOutputFile;
32use crate::error::Result;
33use crate::io::{FileWrite, OutputFile};
34use crate::spec::manifest::_serde::{ManifestEntryV1, ManifestEntryV2};
35use crate::spec::manifest::{manifest_schema_v1, manifest_schema_v2};
36use crate::spec::{
37    DataContentType, DataFile, FieldSummary, ManifestEntry, ManifestFile, ManifestMetadata,
38    ManifestStatus, PrimitiveLiteral, SchemaRef, StructType,
39};
40use crate::{Error, ErrorKind};
41
42/// Placeholder for snapshot ID. The field with this value must be replaced
43/// with the actual snapshot ID before it is committed.
44const UNASSIGNED_SNAPSHOT_ID: i64 = -1;
45
46type WriterFuture = Pin<Box<dyn Future<Output = Result<Box<dyn FileWrite>>> + Send>>;
47
48/// The builder used to create a [`ManifestWriter`].
49pub struct ManifestWriterBuilder {
50    writer_future: WriterFuture,
51    location: String,
52    snapshot_id: Option<i64>,
53    key_metadata: Option<Vec<u8>>,
54    schema: SchemaRef,
55    partition_spec: PartitionSpec,
56}
57
58impl ManifestWriterBuilder {
59    /// Create a new builder.
60    pub fn new(
61        output: OutputFile,
62        snapshot_id: Option<i64>,
63        key_metadata: Option<Vec<u8>>,
64        schema: SchemaRef,
65        partition_spec: PartitionSpec,
66    ) -> Self {
67        let location = output.location().to_owned();
68        Self {
69            writer_future: Box::pin(async move { output.writer().await }),
70            location,
71            snapshot_id,
72            key_metadata,
73            schema,
74            partition_spec,
75        }
76    }
77
78    /// Create a new builder from an [`EncryptedOutputFile`].
79    ///
80    /// Use this when writing manifests with transparent encryption.
81    pub fn new_from_encrypted(
82        encrypted_output: EncryptedOutputFile,
83        snapshot_id: Option<i64>,
84        key_metadata: Option<Vec<u8>>,
85        schema: SchemaRef,
86        partition_spec: PartitionSpec,
87    ) -> Self {
88        let location = encrypted_output.location().to_owned();
89        Self {
90            writer_future: Box::pin(async move { encrypted_output.writer().await }),
91            location,
92            snapshot_id,
93            key_metadata,
94            schema,
95            partition_spec,
96        }
97    }
98
99    /// Build a [`ManifestWriter`] for format version 1.
100    pub fn build_v1(self) -> ManifestWriter {
101        let metadata = ManifestMetadata::builder()
102            .schema_id(self.schema.schema_id())
103            .schema(self.schema)
104            .partition_spec(self.partition_spec)
105            .format_version(FormatVersion::V1)
106            .content(ManifestContentType::Data)
107            .build();
108        ManifestWriter::new(
109            self.writer_future,
110            self.location,
111            self.snapshot_id,
112            self.key_metadata,
113            metadata,
114            None,
115        )
116    }
117
118    /// Build a [`ManifestWriter`] for format version 2, data content.
119    pub fn build_v2_data(self) -> ManifestWriter {
120        let metadata = ManifestMetadata::builder()
121            .schema_id(self.schema.schema_id())
122            .schema(self.schema)
123            .partition_spec(self.partition_spec)
124            .format_version(FormatVersion::V2)
125            .content(ManifestContentType::Data)
126            .build();
127        ManifestWriter::new(
128            self.writer_future,
129            self.location,
130            self.snapshot_id,
131            self.key_metadata,
132            metadata,
133            None,
134        )
135    }
136
137    /// Build a [`ManifestWriter`] for format version 2, deletes content.
138    pub fn build_v2_deletes(self) -> ManifestWriter {
139        let metadata = ManifestMetadata::builder()
140            .schema_id(self.schema.schema_id())
141            .schema(self.schema)
142            .partition_spec(self.partition_spec)
143            .format_version(FormatVersion::V2)
144            .content(ManifestContentType::Deletes)
145            .build();
146        ManifestWriter::new(
147            self.writer_future,
148            self.location,
149            self.snapshot_id,
150            self.key_metadata,
151            metadata,
152            None,
153        )
154    }
155
156    /// Build a [`ManifestWriter`] for format version 2, data content.
157    pub fn build_v3_data(self) -> ManifestWriter {
158        let metadata = ManifestMetadata::builder()
159            .schema_id(self.schema.schema_id())
160            .schema(self.schema)
161            .partition_spec(self.partition_spec)
162            .format_version(FormatVersion::V3)
163            .content(ManifestContentType::Data)
164            .build();
165        ManifestWriter::new(
166            self.writer_future,
167            self.location,
168            self.snapshot_id,
169            self.key_metadata,
170            metadata,
171            // First row id is assigned by the [`ManifestListWriter`] when the manifest
172            // is added to the list.
173            None,
174        )
175    }
176
177    /// Build a [`ManifestWriter`] for format version 3, deletes content.
178    pub fn build_v3_deletes(self) -> ManifestWriter {
179        let metadata = ManifestMetadata::builder()
180            .schema_id(self.schema.schema_id())
181            .schema(self.schema)
182            .partition_spec(self.partition_spec)
183            .format_version(FormatVersion::V3)
184            .content(ManifestContentType::Deletes)
185            .build();
186        ManifestWriter::new(
187            self.writer_future,
188            self.location,
189            self.snapshot_id,
190            self.key_metadata,
191            metadata,
192            None,
193        )
194    }
195}
196
197/// A manifest writer.
198pub struct ManifestWriter {
199    writer_future: WriterFuture,
200    location: String,
201
202    snapshot_id: Option<i64>,
203
204    added_files: u32,
205    added_rows: u64,
206    existing_files: u32,
207    existing_rows: u64,
208    deleted_files: u32,
209    deleted_rows: u64,
210    first_row_id: Option<u64>,
211
212    min_seq_num: Option<i64>,
213
214    key_metadata: Option<Vec<u8>>,
215
216    manifest_entries: Vec<ManifestEntry>,
217
218    metadata: ManifestMetadata,
219}
220
221impl ManifestWriter {
222    /// Create a new manifest writer.
223    pub(crate) fn new(
224        writer_future: WriterFuture,
225        location: String,
226        snapshot_id: Option<i64>,
227        key_metadata: Option<Vec<u8>>,
228        metadata: ManifestMetadata,
229        first_row_id: Option<u64>,
230    ) -> Self {
231        Self {
232            writer_future,
233            location,
234            snapshot_id,
235            added_files: 0,
236            added_rows: 0,
237            existing_files: 0,
238            existing_rows: 0,
239            deleted_files: 0,
240            deleted_rows: 0,
241            first_row_id,
242            min_seq_num: None,
243            key_metadata,
244            manifest_entries: Vec::new(),
245            metadata,
246        }
247    }
248
249    fn construct_partition_summaries(
250        &mut self,
251        partition_type: &StructType,
252    ) -> Result<Vec<FieldSummary>> {
253        let mut field_stats: Vec<_> = partition_type
254            .fields()
255            .iter()
256            .map(|f| PartitionFieldStats::new(f.field_type.as_primitive_type().unwrap().clone()))
257            .collect();
258        for partition in self.manifest_entries.iter().map(|e| &e.data_file.partition) {
259            for (literal, stat) in partition.iter().zip_eq(field_stats.iter_mut()) {
260                let primitive_literal = literal.map(|v| v.as_primitive_literal().unwrap());
261                stat.update(primitive_literal)?;
262            }
263        }
264        Ok(field_stats.into_iter().map(|stat| stat.finish()).collect())
265    }
266
267    fn check_data_file(&self, data_file: &DataFile) -> Result<()> {
268        match self.metadata.content {
269            ManifestContentType::Data => {
270                if data_file.content != DataContentType::Data {
271                    return Err(Error::new(
272                        ErrorKind::DataInvalid,
273                        format!(
274                            "Date file at path {} with manifest content type `data`, should have DataContentType `Data`, but has `{:?}`",
275                            data_file.file_path(),
276                            data_file.content
277                        ),
278                    ));
279                }
280            }
281            ManifestContentType::Deletes => {
282                if data_file.content != DataContentType::EqualityDeletes
283                    && data_file.content != DataContentType::PositionDeletes
284                {
285                    return Err(Error::new(
286                        ErrorKind::DataInvalid,
287                        format!(
288                            "Date file at path {} with manifest content type `deletes`, should have DataContentType `Data`, but has `{:?}`",
289                            data_file.file_path(),
290                            data_file.content
291                        ),
292                    ));
293                }
294            }
295        }
296        Ok(())
297    }
298
299    /// Add a new manifest entry. This method will update following status of the entry:
300    /// - Update the entry status to `Added`
301    /// - Set the snapshot id to the current snapshot id
302    /// - Set the sequence number to `None` if it is invalid(smaller than 0)
303    /// - Set the file sequence number to `None`
304    pub(crate) fn add_entry(&mut self, mut entry: ManifestEntry) -> Result<()> {
305        self.check_data_file(&entry.data_file)?;
306        if entry.sequence_number().is_some_and(|n| n >= 0) {
307            entry.status = ManifestStatus::Added;
308            entry.snapshot_id = self.snapshot_id;
309            entry.file_sequence_number = None;
310        } else {
311            entry.status = ManifestStatus::Added;
312            entry.snapshot_id = self.snapshot_id;
313            entry.sequence_number = None;
314            entry.file_sequence_number = None;
315        };
316        self.add_entry_inner(entry)?;
317        Ok(())
318    }
319
320    /// Add file as an added entry with a specific sequence number. The entry's snapshot ID will be this manifest's snapshot ID. The entry's data sequence
321    /// number will be the provided data sequence number. The entry's file sequence number will be
322    /// assigned at commit.
323    pub fn add_file(&mut self, data_file: DataFile, sequence_number: i64) -> Result<()> {
324        self.check_data_file(&data_file)?;
325        let entry = ManifestEntry {
326            status: ManifestStatus::Added,
327            snapshot_id: self.snapshot_id,
328            sequence_number: (sequence_number >= 0).then_some(sequence_number),
329            file_sequence_number: None,
330            data_file,
331        };
332        self.add_entry_inner(entry)?;
333        Ok(())
334    }
335
336    /// Add a delete manifest entry. This method will update following status of the entry:
337    /// - Update the entry status to `Deleted`
338    /// - Set the snapshot id to the current snapshot id
339    ///
340    /// # TODO
341    /// Remove this allow later
342    #[allow(dead_code)]
343    pub(crate) fn add_delete_entry(&mut self, mut entry: ManifestEntry) -> Result<()> {
344        self.check_data_file(&entry.data_file)?;
345        entry.status = ManifestStatus::Deleted;
346        entry.snapshot_id = self.snapshot_id;
347        self.add_entry_inner(entry)?;
348        Ok(())
349    }
350
351    /// Add a file as delete manifest entry. The entry's snapshot ID will be this manifest's snapshot ID.
352    /// However, the original data and file sequence numbers of the file must be preserved when
353    /// the file is marked as deleted.
354    pub fn add_delete_file(
355        &mut self,
356        data_file: DataFile,
357        sequence_number: i64,
358        file_sequence_number: Option<i64>,
359    ) -> Result<()> {
360        self.check_data_file(&data_file)?;
361        let entry = ManifestEntry {
362            status: ManifestStatus::Deleted,
363            snapshot_id: self.snapshot_id,
364            sequence_number: Some(sequence_number),
365            file_sequence_number,
366            data_file,
367        };
368        self.add_entry_inner(entry)?;
369        Ok(())
370    }
371
372    /// Add an existing manifest entry. This method will update following status of the entry:
373    /// - Update the entry status to `Existing`
374    ///
375    /// # TODO
376    /// Remove this allow later
377    #[allow(dead_code)]
378    pub(crate) fn add_existing_entry(&mut self, mut entry: ManifestEntry) -> Result<()> {
379        self.check_data_file(&entry.data_file)?;
380        entry.status = ManifestStatus::Existing;
381        self.add_entry_inner(entry)?;
382        Ok(())
383    }
384
385    /// Add an file as existing manifest entry. The original data and file sequence numbers, snapshot ID,
386    /// which were assigned at commit, must be preserved when adding an existing entry.
387    pub fn add_existing_file(
388        &mut self,
389        data_file: DataFile,
390        snapshot_id: i64,
391        sequence_number: i64,
392        file_sequence_number: Option<i64>,
393    ) -> Result<()> {
394        self.check_data_file(&data_file)?;
395        let entry = ManifestEntry {
396            status: ManifestStatus::Existing,
397            snapshot_id: Some(snapshot_id),
398            sequence_number: Some(sequence_number),
399            file_sequence_number,
400            data_file,
401        };
402        self.add_entry_inner(entry)?;
403        Ok(())
404    }
405
406    fn add_entry_inner(&mut self, entry: ManifestEntry) -> Result<()> {
407        // Check if the entry has sequence number
408        if (entry.status == ManifestStatus::Deleted || entry.status == ManifestStatus::Existing)
409            && (entry.sequence_number.is_none() || entry.file_sequence_number.is_none())
410        {
411            return Err(Error::new(
412                ErrorKind::DataInvalid,
413                "Manifest entry with status Existing or Deleted should have sequence number",
414            ));
415        }
416
417        // Update the statistics
418        match entry.status {
419            ManifestStatus::Added => {
420                self.added_files += 1;
421                self.added_rows += entry.data_file.record_count;
422            }
423            ManifestStatus::Deleted => {
424                self.deleted_files += 1;
425                self.deleted_rows += entry.data_file.record_count;
426            }
427            ManifestStatus::Existing => {
428                self.existing_files += 1;
429                self.existing_rows += entry.data_file.record_count;
430            }
431        }
432        if entry.is_alive()
433            && let Some(seq_num) = entry.sequence_number
434        {
435            self.min_seq_num = Some(self.min_seq_num.map_or(seq_num, |v| min(v, seq_num)));
436        }
437        self.manifest_entries.push(entry);
438        Ok(())
439    }
440
441    /// Write manifest file and return it.
442    pub async fn write_manifest_file(mut self) -> Result<ManifestFile> {
443        // Create the avro writer
444        let partition_type = self
445            .metadata
446            .partition_spec
447            .partition_type(&self.metadata.schema)?;
448        let table_schema = &self.metadata.schema;
449        let avro_schema = match self.metadata.format_version {
450            FormatVersion::V1 => manifest_schema_v1(&partition_type)?,
451            // Manifest schema did not change between V2 and V3
452            FormatVersion::V2 | FormatVersion::V3 => manifest_schema_v2(&partition_type)?,
453        };
454        let mut avro_writer = AvroWriter::new(&avro_schema, Vec::new());
455        avro_writer.add_user_metadata(
456            "schema".to_string(),
457            to_vec(table_schema).map_err(|err| {
458                Error::new(ErrorKind::DataInvalid, "Fail to serialize table schema")
459                    .with_source(err)
460            })?,
461        )?;
462        avro_writer.add_user_metadata(
463            "schema-id".to_string(),
464            table_schema.schema_id().to_string(),
465        )?;
466        avro_writer.add_user_metadata(
467            "partition-spec".to_string(),
468            to_vec(&self.metadata.partition_spec.fields()).map_err(|err| {
469                Error::new(ErrorKind::DataInvalid, "Fail to serialize partition spec")
470                    .with_source(err)
471            })?,
472        )?;
473        avro_writer.add_user_metadata(
474            "partition-spec-id".to_string(),
475            self.metadata.partition_spec.spec_id().to_string(),
476        )?;
477        avro_writer.add_user_metadata(
478            "format-version".to_string(),
479            (self.metadata.format_version as u8).to_string(),
480        )?;
481        match self.metadata.format_version {
482            FormatVersion::V1 => {}
483            FormatVersion::V2 | FormatVersion::V3 => {
484                avro_writer
485                    .add_user_metadata("content".to_string(), self.metadata.content.to_string())?;
486            }
487        }
488
489        let partition_summary = self.construct_partition_summaries(&partition_type)?;
490        // Write manifest entries
491        for entry in std::mem::take(&mut self.manifest_entries) {
492            let value = match self.metadata.format_version {
493                FormatVersion::V1 => to_value(ManifestEntryV1::try_from(entry, &partition_type)?)?
494                    .resolve(&avro_schema)?,
495                // Manifest entry format did not change between V2 and V3
496                FormatVersion::V2 | FormatVersion::V3 => {
497                    to_value(ManifestEntryV2::try_from(entry, &partition_type)?)?
498                        .resolve(&avro_schema)?
499                }
500            };
501
502            avro_writer.append(value)?;
503        }
504
505        let content = avro_writer.into_inner()?;
506        let length = content.len();
507        let mut writer = self.writer_future.await?;
508        writer.write(Bytes::from(content)).await?;
509        writer.close().await?;
510
511        Ok(ManifestFile {
512            manifest_path: self.location,
513            manifest_length: length as i64,
514            partition_spec_id: self.metadata.partition_spec.spec_id(),
515            content: self.metadata.content,
516            // sequence_number and min_sequence_number with UNASSIGNED_SEQUENCE_NUMBER will be replace with
517            // real sequence number in `ManifestListWriter`.
518            sequence_number: UNASSIGNED_SEQUENCE_NUMBER,
519            min_sequence_number: self.min_seq_num.unwrap_or(UNASSIGNED_SEQUENCE_NUMBER),
520            added_snapshot_id: self.snapshot_id.unwrap_or(UNASSIGNED_SNAPSHOT_ID),
521            added_files_count: Some(self.added_files),
522            existing_files_count: Some(self.existing_files),
523            deleted_files_count: Some(self.deleted_files),
524            added_rows_count: Some(self.added_rows),
525            existing_rows_count: Some(self.existing_rows),
526            deleted_rows_count: Some(self.deleted_rows),
527            partitions: Some(partition_summary),
528            key_metadata: self.key_metadata,
529            first_row_id: self.first_row_id,
530        })
531    }
532}
533
534struct PartitionFieldStats {
535    partition_type: PrimitiveType,
536
537    contains_null: bool,
538    contains_nan: Option<bool>,
539    lower_bound: Option<Datum>,
540    upper_bound: Option<Datum>,
541}
542
543impl PartitionFieldStats {
544    pub(crate) fn new(partition_type: PrimitiveType) -> Self {
545        Self {
546            partition_type,
547            contains_null: false,
548            contains_nan: Some(false),
549            upper_bound: None,
550            lower_bound: None,
551        }
552    }
553
554    pub(crate) fn update(&mut self, value: Option<PrimitiveLiteral>) -> Result<()> {
555        let Some(value) = value else {
556            self.contains_null = true;
557            return Ok(());
558        };
559        if !self.partition_type.compatible(&value) {
560            return Err(Error::new(
561                ErrorKind::DataInvalid,
562                "value is not compatible with type",
563            ));
564        }
565        let value = Datum::new(self.partition_type.clone(), value);
566
567        if value.is_nan() {
568            self.contains_nan = Some(true);
569            return Ok(());
570        }
571
572        self.lower_bound = Some(self.lower_bound.take().map_or(value.clone(), |original| {
573            if value < original {
574                value.clone()
575            } else {
576                original
577            }
578        }));
579        self.upper_bound = Some(self.upper_bound.take().map_or(value.clone(), |original| {
580            if value > original { value } else { original }
581        }));
582
583        Ok(())
584    }
585
586    pub(crate) fn finish(self) -> FieldSummary {
587        FieldSummary {
588            contains_null: self.contains_null,
589            contains_nan: self.contains_nan,
590            upper_bound: self.upper_bound.map(|v| v.to_bytes().unwrap()),
591            lower_bound: self.lower_bound.map(|v| v.to_bytes().unwrap()),
592        }
593    }
594}
595
596#[cfg(test)]
597mod tests {
598    use std::collections::HashMap;
599    use std::fs;
600    use std::sync::Arc;
601
602    use tempfile::TempDir;
603
604    use super::*;
605    use crate::io::FileIO;
606    use crate::spec::{DataFileFormat, Manifest, NestedField, PrimitiveType, Schema, Struct, Type};
607
608    #[tokio::test]
609    async fn test_add_delete_existing() {
610        let schema = Arc::new(
611            Schema::builder()
612                .with_fields(vec![
613                    Arc::new(NestedField::optional(
614                        1,
615                        "id",
616                        Type::Primitive(PrimitiveType::Int),
617                    )),
618                    Arc::new(NestedField::optional(
619                        2,
620                        "name",
621                        Type::Primitive(PrimitiveType::String),
622                    )),
623                ])
624                .build()
625                .unwrap(),
626        );
627        let metadata = ManifestMetadata {
628            schema_id: 0,
629            schema: schema.clone(),
630            partition_spec: PartitionSpec::builder(schema)
631                .with_spec_id(0)
632                .build()
633                .unwrap(),
634            content: ManifestContentType::Data,
635            format_version: FormatVersion::V2,
636        };
637        let mut entries = vec![
638                ManifestEntry {
639                    status: ManifestStatus::Added,
640                    snapshot_id: None,
641                    sequence_number: Some(1),
642                    file_sequence_number: Some(1),
643                    data_file: DataFile {
644                        content: DataContentType::Data,
645                        file_path: "s3a://icebergdata/demo/s1/t1/data/00000-0-ba56fbfa-f2ff-40c9-bb27-565ad6dc2be8-00000.parquet".to_string(),
646                        file_format: DataFileFormat::Parquet,
647                        partition: Struct::empty(),
648                        record_count: 1,
649                        file_size_in_bytes: 5442,
650                        column_sizes: HashMap::from([(1, 61), (2, 73)]),
651                        value_counts: HashMap::from([(1, 1), (2, 1)]),
652                        null_value_counts: HashMap::from([(1, 0), (2, 0)]),
653                        nan_value_counts: HashMap::new(),
654                        lower_bounds: HashMap::new(),
655                        upper_bounds: HashMap::new(),
656                        key_metadata: Some(Vec::new()),
657                        split_offsets: Some(vec![4]),
658                        equality_ids: None,
659                        sort_order_id: None,
660                        partition_spec_id: 0,
661                        first_row_id: None,
662                        referenced_data_file: None,
663                        content_offset: None,
664                        content_size_in_bytes: None,
665                    },
666                },
667                ManifestEntry {
668                    status: ManifestStatus::Deleted,
669                    snapshot_id: Some(1),
670                    sequence_number: Some(1),
671                    file_sequence_number: Some(1),
672                    data_file: DataFile {
673                        content: DataContentType::Data,
674                        file_path: "s3a://icebergdata/demo/s1/t1/data/00000-0-ba56fbfa-f2ff-40c9-bb27-565ad6dc2be8-00000.parquet".to_string(),
675                        file_format: DataFileFormat::Parquet,
676                        partition: Struct::empty(),
677                        record_count: 1,
678                        file_size_in_bytes: 5442,
679                        column_sizes: HashMap::from([(1, 61), (2, 73)]),
680                        value_counts: HashMap::from([(1, 1), (2, 1)]),
681                        null_value_counts: HashMap::from([(1, 0), (2, 0)]),
682                        nan_value_counts: HashMap::new(),
683                        lower_bounds: HashMap::new(),
684                        upper_bounds: HashMap::new(),
685                        key_metadata: Some(Vec::new()),
686                        split_offsets: Some(vec![4]),
687                        equality_ids: None,
688                        sort_order_id: None,
689                        partition_spec_id: 0,
690                        first_row_id: None,
691                        referenced_data_file: None,
692                        content_offset: None,
693                        content_size_in_bytes: None,
694                    },
695                },
696                ManifestEntry {
697                    status: ManifestStatus::Existing,
698                    snapshot_id: Some(1),
699                    sequence_number: Some(1),
700                    file_sequence_number: Some(1),
701                    data_file: DataFile {
702                        content: DataContentType::Data,
703                        file_path: "s3a://icebergdata/demo/s1/t1/data/00000-0-ba56fbfa-f2ff-40c9-bb27-565ad6dc2be8-00000.parquet".to_string(),
704                        file_format: DataFileFormat::Parquet,
705                        partition: Struct::empty(),
706                        record_count: 1,
707                        file_size_in_bytes: 5442,
708                        column_sizes: HashMap::from([(1, 61), (2, 73)]),
709                        value_counts: HashMap::from([(1, 1), (2, 1)]),
710                        null_value_counts: HashMap::from([(1, 0), (2, 0)]),
711                        nan_value_counts: HashMap::new(),
712                        lower_bounds: HashMap::new(),
713                        upper_bounds: HashMap::new(),
714                        key_metadata: Some(Vec::new()),
715                        split_offsets: Some(vec![4]),
716                        equality_ids: None,
717                        sort_order_id: None,
718                        partition_spec_id: 0,
719                        first_row_id: None,
720                        referenced_data_file: None,
721                        content_offset: None,
722                        content_size_in_bytes: None,
723                    },
724                },
725            ];
726
727        // write manifest to file
728        let tmp_dir = TempDir::new().unwrap();
729        let path = tmp_dir.path().join("test_manifest.avro");
730        let io = FileIO::new_with_fs();
731        let output_file = io.new_output(path.to_str().unwrap()).unwrap();
732        let mut writer = ManifestWriterBuilder::new(
733            output_file,
734            Some(3),
735            None,
736            metadata.schema.clone(),
737            metadata.partition_spec.clone(),
738        )
739        .build_v2_data();
740        writer.add_entry(entries[0].clone()).unwrap();
741        writer.add_delete_entry(entries[1].clone()).unwrap();
742        writer.add_existing_entry(entries[2].clone()).unwrap();
743        writer.write_manifest_file().await.unwrap();
744
745        // read back the manifest file and check the content
746        let actual_manifest =
747            Manifest::parse_avro(fs::read(path).expect("read_file must succeed").as_slice())
748                .unwrap();
749
750        // The snapshot id is assigned when the entry is added and delete to the manifest. Existing entries are keep original.
751        entries[0].snapshot_id = Some(3);
752        entries[1].snapshot_id = Some(3);
753        // file sequence number is assigned to None when the entry is added and delete to the manifest.
754        entries[0].file_sequence_number = None;
755        assert_eq!(actual_manifest, Manifest::new(metadata, entries));
756    }
757
758    #[tokio::test]
759    async fn test_v3_delete_manifest_delete_file_roundtrip() {
760        let schema = Arc::new(
761            Schema::builder()
762                .with_fields(vec![
763                    Arc::new(NestedField::optional(
764                        1,
765                        "id",
766                        Type::Primitive(PrimitiveType::Long),
767                    )),
768                    Arc::new(NestedField::optional(
769                        2,
770                        "data",
771                        Type::Primitive(PrimitiveType::String),
772                    )),
773                ])
774                .build()
775                .unwrap(),
776        );
777
778        let partition_spec = PartitionSpec::builder(schema.clone())
779            .with_spec_id(0)
780            .build()
781            .unwrap();
782
783        // Create a position delete file entry
784        let delete_entry = ManifestEntry {
785            status: ManifestStatus::Added,
786            snapshot_id: None,
787            sequence_number: None,
788            file_sequence_number: None,
789            data_file: DataFile {
790                content: DataContentType::PositionDeletes,
791                file_path: "s3://bucket/table/data/delete-00000.parquet".to_string(),
792                file_format: DataFileFormat::Parquet,
793                partition: Struct::empty(),
794                record_count: 10,
795                file_size_in_bytes: 1024,
796                column_sizes: HashMap::new(),
797                value_counts: HashMap::new(),
798                null_value_counts: HashMap::new(),
799                nan_value_counts: HashMap::new(),
800                lower_bounds: HashMap::new(),
801                upper_bounds: HashMap::new(),
802                key_metadata: None,
803                split_offsets: None,
804                equality_ids: None,
805                sort_order_id: None,
806                partition_spec_id: 0,
807                first_row_id: None,
808                referenced_data_file: None,
809                content_offset: None,
810                content_size_in_bytes: None,
811            },
812        };
813
814        // Write a V3 delete manifest
815        let tmp_dir = TempDir::new().unwrap();
816        let path = tmp_dir.path().join("v3_delete_manifest.avro");
817        let io = FileIO::new_with_fs();
818        let output_file = io.new_output(path.to_str().unwrap()).unwrap();
819
820        let mut writer = ManifestWriterBuilder::new(
821            output_file,
822            Some(1),
823            None,
824            schema.clone(),
825            partition_spec.clone(),
826        )
827        .build_v3_deletes();
828
829        writer.add_entry(delete_entry).unwrap();
830        let manifest_file = writer.write_manifest_file().await.unwrap();
831
832        // The returned ManifestFile correctly reports Deletes content
833        assert_eq!(manifest_file.content, ManifestContentType::Deletes);
834
835        // Read back the manifest file
836        let actual_manifest =
837            Manifest::parse_avro(fs::read(&path).expect("read_file must succeed").as_slice())
838                .unwrap();
839
840        // Verify the content type is correctly preserved as Deletes
841        assert_eq!(
842            actual_manifest.metadata().content,
843            ManifestContentType::Deletes,
844        );
845    }
846}