1use std::cmp::min;
19
20use apache_avro::{Writer as AvroWriter, to_value};
21use bytes::Bytes;
22use itertools::Itertools;
23use serde_json::to_vec;
24
25use super::{
26 Datum, FormatVersion, ManifestContentType, PartitionSpec, PrimitiveType,
27 UNASSIGNED_SEQUENCE_NUMBER,
28};
29use crate::error::Result;
30use crate::io::OutputFile;
31use crate::spec::manifest::_serde::{ManifestEntryV1, ManifestEntryV2};
32use crate::spec::manifest::{manifest_schema_v1, manifest_schema_v2};
33use crate::spec::{
34 DataContentType, DataFile, FieldSummary, ManifestEntry, ManifestFile, ManifestMetadata,
35 ManifestStatus, PrimitiveLiteral, SchemaRef, StructType, UNASSIGNED_SNAPSHOT_ID,
36};
37use crate::{Error, ErrorKind};
38
39pub struct ManifestWriterBuilder {
41 output: OutputFile,
42 snapshot_id: Option<i64>,
43 key_metadata: Option<Vec<u8>>,
44 schema: SchemaRef,
45 partition_spec: PartitionSpec,
46}
47
48impl ManifestWriterBuilder {
49 pub fn new(
51 output: OutputFile,
52 snapshot_id: Option<i64>,
53 key_metadata: Option<Vec<u8>>,
54 schema: SchemaRef,
55 partition_spec: PartitionSpec,
56 ) -> Self {
57 Self {
58 output,
59 snapshot_id,
60 key_metadata,
61 schema,
62 partition_spec,
63 }
64 }
65
66 pub fn build_v1(self) -> ManifestWriter {
68 let metadata = ManifestMetadata::builder()
69 .schema_id(self.schema.schema_id())
70 .schema(self.schema)
71 .partition_spec(self.partition_spec)
72 .format_version(FormatVersion::V1)
73 .content(ManifestContentType::Data)
74 .build();
75 ManifestWriter::new(
76 self.output,
77 self.snapshot_id,
78 self.key_metadata,
79 metadata,
80 None,
81 )
82 }
83
84 pub fn build_v2_data(self) -> ManifestWriter {
86 let metadata = ManifestMetadata::builder()
87 .schema_id(self.schema.schema_id())
88 .schema(self.schema)
89 .partition_spec(self.partition_spec)
90 .format_version(FormatVersion::V2)
91 .content(ManifestContentType::Data)
92 .build();
93 ManifestWriter::new(
94 self.output,
95 self.snapshot_id,
96 self.key_metadata,
97 metadata,
98 None,
99 )
100 }
101
102 pub fn build_v2_deletes(self) -> ManifestWriter {
104 let metadata = ManifestMetadata::builder()
105 .schema_id(self.schema.schema_id())
106 .schema(self.schema)
107 .partition_spec(self.partition_spec)
108 .format_version(FormatVersion::V2)
109 .content(ManifestContentType::Deletes)
110 .build();
111 ManifestWriter::new(
112 self.output,
113 self.snapshot_id,
114 self.key_metadata,
115 metadata,
116 None,
117 )
118 }
119
120 pub fn build_v3_data(self) -> ManifestWriter {
122 let metadata = ManifestMetadata::builder()
123 .schema_id(self.schema.schema_id())
124 .schema(self.schema)
125 .partition_spec(self.partition_spec)
126 .format_version(FormatVersion::V3)
127 .content(ManifestContentType::Data)
128 .build();
129 ManifestWriter::new(
130 self.output,
131 self.snapshot_id,
132 self.key_metadata,
133 metadata,
134 None,
137 )
138 }
139
140 pub fn build_v3_deletes(self) -> ManifestWriter {
142 let metadata = ManifestMetadata::builder()
143 .schema_id(self.schema.schema_id())
144 .schema(self.schema)
145 .partition_spec(self.partition_spec)
146 .format_version(FormatVersion::V3)
147 .content(ManifestContentType::Deletes)
148 .build();
149 ManifestWriter::new(
150 self.output,
151 self.snapshot_id,
152 self.key_metadata,
153 metadata,
154 None,
155 )
156 }
157}
158
159pub struct ManifestWriter {
161 output: OutputFile,
162
163 snapshot_id: Option<i64>,
164
165 added_files: u32,
166 added_rows: u64,
167 existing_files: u32,
168 existing_rows: u64,
169 deleted_files: u32,
170 deleted_rows: u64,
171 first_row_id: Option<u64>,
172
173 min_seq_num: Option<i64>,
174
175 key_metadata: Option<Vec<u8>>,
176
177 manifest_entries: Vec<ManifestEntry>,
178
179 metadata: ManifestMetadata,
180}
181
182impl ManifestWriter {
183 pub(crate) fn new(
185 output: OutputFile,
186 snapshot_id: Option<i64>,
187 key_metadata: Option<Vec<u8>>,
188 metadata: ManifestMetadata,
189 first_row_id: Option<u64>,
190 ) -> Self {
191 Self {
192 output,
193 snapshot_id,
194 added_files: 0,
195 added_rows: 0,
196 existing_files: 0,
197 existing_rows: 0,
198 deleted_files: 0,
199 deleted_rows: 0,
200 first_row_id,
201 min_seq_num: None,
202 key_metadata,
203 manifest_entries: Vec::new(),
204 metadata,
205 }
206 }
207
208 fn construct_partition_summaries(
209 &mut self,
210 partition_type: &StructType,
211 ) -> Result<Vec<FieldSummary>> {
212 let mut field_stats: Vec<_> = partition_type
213 .fields()
214 .iter()
215 .map(|f| PartitionFieldStats::new(f.field_type.as_primitive_type().unwrap().clone()))
216 .collect();
217 for partition in self.manifest_entries.iter().map(|e| &e.data_file.partition) {
218 for (literal, stat) in partition.iter().zip_eq(field_stats.iter_mut()) {
219 let primitive_literal = literal.map(|v| v.as_primitive_literal().unwrap());
220 stat.update(primitive_literal)?;
221 }
222 }
223 Ok(field_stats.into_iter().map(|stat| stat.finish()).collect())
224 }
225
226 fn check_data_file(&self, data_file: &DataFile) -> Result<()> {
227 match self.metadata.content {
228 ManifestContentType::Data => {
229 if data_file.content != DataContentType::Data {
230 return Err(Error::new(
231 ErrorKind::DataInvalid,
232 format!(
233 "Date file at path {} with manifest content type `data`, should have DataContentType `Data`, but has `{:?}`",
234 data_file.file_path(),
235 data_file.content
236 ),
237 ));
238 }
239 }
240 ManifestContentType::Deletes => {
241 if data_file.content != DataContentType::EqualityDeletes
242 && data_file.content != DataContentType::PositionDeletes
243 {
244 return Err(Error::new(
245 ErrorKind::DataInvalid,
246 format!(
247 "Date file at path {} with manifest content type `deletes`, should have DataContentType `Data`, but has `{:?}`",
248 data_file.file_path(),
249 data_file.content
250 ),
251 ));
252 }
253 }
254 }
255 Ok(())
256 }
257
258 pub(crate) fn add_entry(&mut self, mut entry: ManifestEntry) -> Result<()> {
264 self.check_data_file(&entry.data_file)?;
265 if entry.sequence_number().is_some_and(|n| n >= 0) {
266 entry.status = ManifestStatus::Added;
267 entry.snapshot_id = self.snapshot_id;
268 entry.file_sequence_number = None;
269 } else {
270 entry.status = ManifestStatus::Added;
271 entry.snapshot_id = self.snapshot_id;
272 entry.sequence_number = None;
273 entry.file_sequence_number = None;
274 };
275 self.add_entry_inner(entry)?;
276 Ok(())
277 }
278
279 pub fn add_file(&mut self, data_file: DataFile, sequence_number: i64) -> Result<()> {
283 self.check_data_file(&data_file)?;
284 let entry = ManifestEntry {
285 status: ManifestStatus::Added,
286 snapshot_id: self.snapshot_id,
287 sequence_number: (sequence_number >= 0).then_some(sequence_number),
288 file_sequence_number: None,
289 data_file,
290 };
291 self.add_entry_inner(entry)?;
292 Ok(())
293 }
294
295 #[allow(dead_code)]
302 pub(crate) fn add_delete_entry(&mut self, mut entry: ManifestEntry) -> Result<()> {
303 self.check_data_file(&entry.data_file)?;
304 entry.status = ManifestStatus::Deleted;
305 entry.snapshot_id = self.snapshot_id;
306 self.add_entry_inner(entry)?;
307 Ok(())
308 }
309
310 pub fn add_delete_file(
314 &mut self,
315 data_file: DataFile,
316 sequence_number: i64,
317 file_sequence_number: Option<i64>,
318 ) -> Result<()> {
319 self.check_data_file(&data_file)?;
320 let entry = ManifestEntry {
321 status: ManifestStatus::Deleted,
322 snapshot_id: self.snapshot_id,
323 sequence_number: Some(sequence_number),
324 file_sequence_number,
325 data_file,
326 };
327 self.add_entry_inner(entry)?;
328 Ok(())
329 }
330
331 #[allow(dead_code)]
337 pub(crate) fn add_existing_entry(&mut self, mut entry: ManifestEntry) -> Result<()> {
338 self.check_data_file(&entry.data_file)?;
339 entry.status = ManifestStatus::Existing;
340 self.add_entry_inner(entry)?;
341 Ok(())
342 }
343
344 pub fn add_existing_file(
347 &mut self,
348 data_file: DataFile,
349 snapshot_id: i64,
350 sequence_number: i64,
351 file_sequence_number: Option<i64>,
352 ) -> Result<()> {
353 self.check_data_file(&data_file)?;
354 let entry = ManifestEntry {
355 status: ManifestStatus::Existing,
356 snapshot_id: Some(snapshot_id),
357 sequence_number: Some(sequence_number),
358 file_sequence_number,
359 data_file,
360 };
361 self.add_entry_inner(entry)?;
362 Ok(())
363 }
364
365 fn add_entry_inner(&mut self, entry: ManifestEntry) -> Result<()> {
366 if (entry.status == ManifestStatus::Deleted || entry.status == ManifestStatus::Existing)
368 && (entry.sequence_number.is_none() || entry.file_sequence_number.is_none())
369 {
370 return Err(Error::new(
371 ErrorKind::DataInvalid,
372 "Manifest entry with status Existing or Deleted should have sequence number",
373 ));
374 }
375
376 match entry.status {
378 ManifestStatus::Added => {
379 self.added_files += 1;
380 self.added_rows += entry.data_file.record_count;
381 }
382 ManifestStatus::Deleted => {
383 self.deleted_files += 1;
384 self.deleted_rows += entry.data_file.record_count;
385 }
386 ManifestStatus::Existing => {
387 self.existing_files += 1;
388 self.existing_rows += entry.data_file.record_count;
389 }
390 }
391 if entry.is_alive()
392 && let Some(seq_num) = entry.sequence_number
393 {
394 self.min_seq_num = Some(self.min_seq_num.map_or(seq_num, |v| min(v, seq_num)));
395 }
396 self.manifest_entries.push(entry);
397 Ok(())
398 }
399
400 pub async fn write_manifest_file(mut self) -> Result<ManifestFile> {
402 let partition_type = self
404 .metadata
405 .partition_spec
406 .partition_type(&self.metadata.schema)?;
407 let table_schema = &self.metadata.schema;
408 let avro_schema = match self.metadata.format_version {
409 FormatVersion::V1 => manifest_schema_v1(&partition_type)?,
410 FormatVersion::V2 | FormatVersion::V3 => manifest_schema_v2(&partition_type)?,
412 };
413 let mut avro_writer = AvroWriter::new(&avro_schema, Vec::new());
414 avro_writer.add_user_metadata(
415 "schema".to_string(),
416 to_vec(table_schema).map_err(|err| {
417 Error::new(ErrorKind::DataInvalid, "Fail to serialize table schema")
418 .with_source(err)
419 })?,
420 )?;
421 avro_writer.add_user_metadata(
422 "schema-id".to_string(),
423 table_schema.schema_id().to_string(),
424 )?;
425 avro_writer.add_user_metadata(
426 "partition-spec".to_string(),
427 to_vec(&self.metadata.partition_spec.fields()).map_err(|err| {
428 Error::new(ErrorKind::DataInvalid, "Fail to serialize partition spec")
429 .with_source(err)
430 })?,
431 )?;
432 avro_writer.add_user_metadata(
433 "partition-spec-id".to_string(),
434 self.metadata.partition_spec.spec_id().to_string(),
435 )?;
436 avro_writer.add_user_metadata(
437 "format-version".to_string(),
438 (self.metadata.format_version as u8).to_string(),
439 )?;
440 match self.metadata.format_version {
441 FormatVersion::V1 => {}
442 FormatVersion::V2 | FormatVersion::V3 => {
443 avro_writer
444 .add_user_metadata("content".to_string(), self.metadata.content.to_string())?;
445 }
446 }
447
448 let partition_summary = self.construct_partition_summaries(&partition_type)?;
449 for entry in std::mem::take(&mut self.manifest_entries) {
451 let value = match self.metadata.format_version {
452 FormatVersion::V1 => to_value(ManifestEntryV1::try_from(entry, &partition_type)?)?
453 .resolve(&avro_schema)?,
454 FormatVersion::V2 | FormatVersion::V3 => {
456 to_value(ManifestEntryV2::try_from(entry, &partition_type)?)?
457 .resolve(&avro_schema)?
458 }
459 };
460
461 avro_writer.append(value)?;
462 }
463
464 let content = avro_writer.into_inner()?;
465 let length = content.len();
466 self.output.write(Bytes::from(content)).await?;
467
468 Ok(ManifestFile {
469 manifest_path: self.output.location().to_string(),
470 manifest_length: length as i64,
471 partition_spec_id: self.metadata.partition_spec.spec_id(),
472 content: self.metadata.content,
473 sequence_number: UNASSIGNED_SEQUENCE_NUMBER,
476 min_sequence_number: self.min_seq_num.unwrap_or(UNASSIGNED_SEQUENCE_NUMBER),
477 added_snapshot_id: self.snapshot_id.unwrap_or(UNASSIGNED_SNAPSHOT_ID),
478 added_files_count: Some(self.added_files),
479 existing_files_count: Some(self.existing_files),
480 deleted_files_count: Some(self.deleted_files),
481 added_rows_count: Some(self.added_rows),
482 existing_rows_count: Some(self.existing_rows),
483 deleted_rows_count: Some(self.deleted_rows),
484 partitions: Some(partition_summary),
485 key_metadata: self.key_metadata,
486 first_row_id: self.first_row_id,
487 })
488 }
489}
490
491struct PartitionFieldStats {
492 partition_type: PrimitiveType,
493
494 contains_null: bool,
495 contains_nan: Option<bool>,
496 lower_bound: Option<Datum>,
497 upper_bound: Option<Datum>,
498}
499
500impl PartitionFieldStats {
501 pub(crate) fn new(partition_type: PrimitiveType) -> Self {
502 Self {
503 partition_type,
504 contains_null: false,
505 contains_nan: Some(false),
506 upper_bound: None,
507 lower_bound: None,
508 }
509 }
510
511 pub(crate) fn update(&mut self, value: Option<PrimitiveLiteral>) -> Result<()> {
512 let Some(value) = value else {
513 self.contains_null = true;
514 return Ok(());
515 };
516 if !self.partition_type.compatible(&value) {
517 return Err(Error::new(
518 ErrorKind::DataInvalid,
519 "value is not compatible with type",
520 ));
521 }
522 let value = Datum::new(self.partition_type.clone(), value);
523
524 if value.is_nan() {
525 self.contains_nan = Some(true);
526 return Ok(());
527 }
528
529 self.lower_bound = Some(self.lower_bound.take().map_or(value.clone(), |original| {
530 if value < original {
531 value.clone()
532 } else {
533 original
534 }
535 }));
536 self.upper_bound = Some(self.upper_bound.take().map_or(value.clone(), |original| {
537 if value > original { value } else { original }
538 }));
539
540 Ok(())
541 }
542
543 pub(crate) fn finish(self) -> FieldSummary {
544 FieldSummary {
545 contains_null: self.contains_null,
546 contains_nan: self.contains_nan,
547 upper_bound: self.upper_bound.map(|v| v.to_bytes().unwrap()),
548 lower_bound: self.lower_bound.map(|v| v.to_bytes().unwrap()),
549 }
550 }
551}
552
553#[cfg(test)]
554mod tests {
555 use std::collections::HashMap;
556 use std::fs;
557 use std::sync::Arc;
558
559 use tempfile::TempDir;
560
561 use super::*;
562 use crate::io::FileIOBuilder;
563 use crate::spec::{DataFileFormat, Manifest, NestedField, PrimitiveType, Schema, Struct, Type};
564
565 #[tokio::test]
566 async fn test_add_delete_existing() {
567 let schema = Arc::new(
568 Schema::builder()
569 .with_fields(vec![
570 Arc::new(NestedField::optional(
571 1,
572 "id",
573 Type::Primitive(PrimitiveType::Int),
574 )),
575 Arc::new(NestedField::optional(
576 2,
577 "name",
578 Type::Primitive(PrimitiveType::String),
579 )),
580 ])
581 .build()
582 .unwrap(),
583 );
584 let metadata = ManifestMetadata {
585 schema_id: 0,
586 schema: schema.clone(),
587 partition_spec: PartitionSpec::builder(schema)
588 .with_spec_id(0)
589 .build()
590 .unwrap(),
591 content: ManifestContentType::Data,
592 format_version: FormatVersion::V2,
593 };
594 let mut entries = vec![
595 ManifestEntry {
596 status: ManifestStatus::Added,
597 snapshot_id: None,
598 sequence_number: Some(1),
599 file_sequence_number: Some(1),
600 data_file: DataFile {
601 content: DataContentType::Data,
602 file_path: "s3a://icebergdata/demo/s1/t1/data/00000-0-ba56fbfa-f2ff-40c9-bb27-565ad6dc2be8-00000.parquet".to_string(),
603 file_format: DataFileFormat::Parquet,
604 partition: Struct::empty(),
605 record_count: 1,
606 file_size_in_bytes: 5442,
607 column_sizes: HashMap::from([(1, 61), (2, 73)]),
608 value_counts: HashMap::from([(1, 1), (2, 1)]),
609 null_value_counts: HashMap::from([(1, 0), (2, 0)]),
610 nan_value_counts: HashMap::new(),
611 lower_bounds: HashMap::new(),
612 upper_bounds: HashMap::new(),
613 key_metadata: Some(Vec::new()),
614 split_offsets: Some(vec![4]),
615 equality_ids: None,
616 sort_order_id: None,
617 partition_spec_id: 0,
618 first_row_id: None,
619 referenced_data_file: None,
620 content_offset: None,
621 content_size_in_bytes: None,
622 },
623 },
624 ManifestEntry {
625 status: ManifestStatus::Deleted,
626 snapshot_id: Some(1),
627 sequence_number: Some(1),
628 file_sequence_number: Some(1),
629 data_file: DataFile {
630 content: DataContentType::Data,
631 file_path: "s3a://icebergdata/demo/s1/t1/data/00000-0-ba56fbfa-f2ff-40c9-bb27-565ad6dc2be8-00000.parquet".to_string(),
632 file_format: DataFileFormat::Parquet,
633 partition: Struct::empty(),
634 record_count: 1,
635 file_size_in_bytes: 5442,
636 column_sizes: HashMap::from([(1, 61), (2, 73)]),
637 value_counts: HashMap::from([(1, 1), (2, 1)]),
638 null_value_counts: HashMap::from([(1, 0), (2, 0)]),
639 nan_value_counts: HashMap::new(),
640 lower_bounds: HashMap::new(),
641 upper_bounds: HashMap::new(),
642 key_metadata: Some(Vec::new()),
643 split_offsets: Some(vec![4]),
644 equality_ids: None,
645 sort_order_id: None,
646 partition_spec_id: 0,
647 first_row_id: None,
648 referenced_data_file: None,
649 content_offset: None,
650 content_size_in_bytes: None,
651 },
652 },
653 ManifestEntry {
654 status: ManifestStatus::Existing,
655 snapshot_id: Some(1),
656 sequence_number: Some(1),
657 file_sequence_number: Some(1),
658 data_file: DataFile {
659 content: DataContentType::Data,
660 file_path: "s3a://icebergdata/demo/s1/t1/data/00000-0-ba56fbfa-f2ff-40c9-bb27-565ad6dc2be8-00000.parquet".to_string(),
661 file_format: DataFileFormat::Parquet,
662 partition: Struct::empty(),
663 record_count: 1,
664 file_size_in_bytes: 5442,
665 column_sizes: HashMap::from([(1, 61), (2, 73)]),
666 value_counts: HashMap::from([(1, 1), (2, 1)]),
667 null_value_counts: HashMap::from([(1, 0), (2, 0)]),
668 nan_value_counts: HashMap::new(),
669 lower_bounds: HashMap::new(),
670 upper_bounds: HashMap::new(),
671 key_metadata: Some(Vec::new()),
672 split_offsets: Some(vec![4]),
673 equality_ids: None,
674 sort_order_id: None,
675 partition_spec_id: 0,
676 first_row_id: None,
677 referenced_data_file: None,
678 content_offset: None,
679 content_size_in_bytes: None,
680 },
681 },
682 ];
683
684 let tmp_dir = TempDir::new().unwrap();
686 let path = tmp_dir.path().join("test_manifest.avro");
687 let io = FileIOBuilder::new_fs_io().build().unwrap();
688 let output_file = io.new_output(path.to_str().unwrap()).unwrap();
689 let mut writer = ManifestWriterBuilder::new(
690 output_file,
691 Some(3),
692 None,
693 metadata.schema.clone(),
694 metadata.partition_spec.clone(),
695 )
696 .build_v2_data();
697 writer.add_entry(entries[0].clone()).unwrap();
698 writer.add_delete_entry(entries[1].clone()).unwrap();
699 writer.add_existing_entry(entries[2].clone()).unwrap();
700 writer.write_manifest_file().await.unwrap();
701
702 let actual_manifest =
704 Manifest::parse_avro(fs::read(path).expect("read_file must succeed").as_slice())
705 .unwrap();
706
707 entries[0].snapshot_id = Some(3);
709 entries[1].snapshot_id = Some(3);
710 entries[0].file_sequence_number = None;
712 assert_eq!(actual_manifest, Manifest::new(metadata, entries));
713 }
714
715 #[tokio::test]
716 async fn test_v3_delete_manifest_delete_file_roundtrip() {
717 let schema = Arc::new(
718 Schema::builder()
719 .with_fields(vec![
720 Arc::new(NestedField::optional(
721 1,
722 "id",
723 Type::Primitive(PrimitiveType::Long),
724 )),
725 Arc::new(NestedField::optional(
726 2,
727 "data",
728 Type::Primitive(PrimitiveType::String),
729 )),
730 ])
731 .build()
732 .unwrap(),
733 );
734
735 let partition_spec = PartitionSpec::builder(schema.clone())
736 .with_spec_id(0)
737 .build()
738 .unwrap();
739
740 let delete_entry = ManifestEntry {
742 status: ManifestStatus::Added,
743 snapshot_id: None,
744 sequence_number: None,
745 file_sequence_number: None,
746 data_file: DataFile {
747 content: DataContentType::PositionDeletes,
748 file_path: "s3://bucket/table/data/delete-00000.parquet".to_string(),
749 file_format: DataFileFormat::Parquet,
750 partition: Struct::empty(),
751 record_count: 10,
752 file_size_in_bytes: 1024,
753 column_sizes: HashMap::new(),
754 value_counts: HashMap::new(),
755 null_value_counts: HashMap::new(),
756 nan_value_counts: HashMap::new(),
757 lower_bounds: HashMap::new(),
758 upper_bounds: HashMap::new(),
759 key_metadata: None,
760 split_offsets: None,
761 equality_ids: None,
762 sort_order_id: None,
763 partition_spec_id: 0,
764 first_row_id: None,
765 referenced_data_file: None,
766 content_offset: None,
767 content_size_in_bytes: None,
768 },
769 };
770
771 let tmp_dir = TempDir::new().unwrap();
773 let path = tmp_dir.path().join("v3_delete_manifest.avro");
774 let io = FileIOBuilder::new_fs_io().build().unwrap();
775 let output_file = io.new_output(path.to_str().unwrap()).unwrap();
776
777 let mut writer = ManifestWriterBuilder::new(
778 output_file,
779 Some(1),
780 None,
781 schema.clone(),
782 partition_spec.clone(),
783 )
784 .build_v3_deletes();
785
786 writer.add_entry(delete_entry).unwrap();
787 let manifest_file = writer.write_manifest_file().await.unwrap();
788
789 assert_eq!(manifest_file.content, ManifestContentType::Deletes);
791
792 let actual_manifest =
794 Manifest::parse_avro(fs::read(&path).expect("read_file must succeed").as_slice())
795 .unwrap();
796
797 assert_eq!(
799 actual_manifest.metadata().content,
800 ManifestContentType::Deletes,
801 );
802 }
803}