1use std::cmp::min;
19
20use apache_avro::{Writer as AvroWriter, to_value};
21use bytes::Bytes;
22use itertools::Itertools;
23use serde_json::to_vec;
24
25use super::{
26 Datum, FormatVersion, ManifestContentType, PartitionSpec, PrimitiveType,
27 UNASSIGNED_SEQUENCE_NUMBER,
28};
29use crate::error::Result;
30use crate::io::OutputFile;
31use crate::spec::manifest::_serde::{ManifestEntryV1, ManifestEntryV2};
32use crate::spec::manifest::{manifest_schema_v1, manifest_schema_v2};
33use crate::spec::{
34 DataContentType, DataFile, FieldSummary, ManifestEntry, ManifestFile, ManifestMetadata,
35 ManifestStatus, PrimitiveLiteral, SchemaRef, StructType,
36};
37use crate::{Error, ErrorKind};
38
39const UNASSIGNED_SNAPSHOT_ID: i64 = -1;
42
43pub struct ManifestWriterBuilder {
45 output: OutputFile,
46 snapshot_id: Option<i64>,
47 key_metadata: Option<Vec<u8>>,
48 schema: SchemaRef,
49 partition_spec: PartitionSpec,
50}
51
52impl ManifestWriterBuilder {
53 pub fn new(
55 output: OutputFile,
56 snapshot_id: Option<i64>,
57 key_metadata: Option<Vec<u8>>,
58 schema: SchemaRef,
59 partition_spec: PartitionSpec,
60 ) -> Self {
61 Self {
62 output,
63 snapshot_id,
64 key_metadata,
65 schema,
66 partition_spec,
67 }
68 }
69
70 pub fn build_v1(self) -> ManifestWriter {
72 let metadata = ManifestMetadata::builder()
73 .schema_id(self.schema.schema_id())
74 .schema(self.schema)
75 .partition_spec(self.partition_spec)
76 .format_version(FormatVersion::V1)
77 .content(ManifestContentType::Data)
78 .build();
79 ManifestWriter::new(
80 self.output,
81 self.snapshot_id,
82 self.key_metadata,
83 metadata,
84 None,
85 )
86 }
87
88 pub fn build_v2_data(self) -> ManifestWriter {
90 let metadata = ManifestMetadata::builder()
91 .schema_id(self.schema.schema_id())
92 .schema(self.schema)
93 .partition_spec(self.partition_spec)
94 .format_version(FormatVersion::V2)
95 .content(ManifestContentType::Data)
96 .build();
97 ManifestWriter::new(
98 self.output,
99 self.snapshot_id,
100 self.key_metadata,
101 metadata,
102 None,
103 )
104 }
105
106 pub fn build_v2_deletes(self) -> ManifestWriter {
108 let metadata = ManifestMetadata::builder()
109 .schema_id(self.schema.schema_id())
110 .schema(self.schema)
111 .partition_spec(self.partition_spec)
112 .format_version(FormatVersion::V2)
113 .content(ManifestContentType::Deletes)
114 .build();
115 ManifestWriter::new(
116 self.output,
117 self.snapshot_id,
118 self.key_metadata,
119 metadata,
120 None,
121 )
122 }
123
124 pub fn build_v3_data(self) -> ManifestWriter {
126 let metadata = ManifestMetadata::builder()
127 .schema_id(self.schema.schema_id())
128 .schema(self.schema)
129 .partition_spec(self.partition_spec)
130 .format_version(FormatVersion::V3)
131 .content(ManifestContentType::Data)
132 .build();
133 ManifestWriter::new(
134 self.output,
135 self.snapshot_id,
136 self.key_metadata,
137 metadata,
138 None,
141 )
142 }
143
144 pub fn build_v3_deletes(self) -> ManifestWriter {
146 let metadata = ManifestMetadata::builder()
147 .schema_id(self.schema.schema_id())
148 .schema(self.schema)
149 .partition_spec(self.partition_spec)
150 .format_version(FormatVersion::V3)
151 .content(ManifestContentType::Deletes)
152 .build();
153 ManifestWriter::new(
154 self.output,
155 self.snapshot_id,
156 self.key_metadata,
157 metadata,
158 None,
159 )
160 }
161}
162
163pub struct ManifestWriter {
165 output: OutputFile,
166
167 snapshot_id: Option<i64>,
168
169 added_files: u32,
170 added_rows: u64,
171 existing_files: u32,
172 existing_rows: u64,
173 deleted_files: u32,
174 deleted_rows: u64,
175 first_row_id: Option<u64>,
176
177 min_seq_num: Option<i64>,
178
179 key_metadata: Option<Vec<u8>>,
180
181 manifest_entries: Vec<ManifestEntry>,
182
183 metadata: ManifestMetadata,
184}
185
186impl ManifestWriter {
187 pub(crate) fn new(
189 output: OutputFile,
190 snapshot_id: Option<i64>,
191 key_metadata: Option<Vec<u8>>,
192 metadata: ManifestMetadata,
193 first_row_id: Option<u64>,
194 ) -> Self {
195 Self {
196 output,
197 snapshot_id,
198 added_files: 0,
199 added_rows: 0,
200 existing_files: 0,
201 existing_rows: 0,
202 deleted_files: 0,
203 deleted_rows: 0,
204 first_row_id,
205 min_seq_num: None,
206 key_metadata,
207 manifest_entries: Vec::new(),
208 metadata,
209 }
210 }
211
212 fn construct_partition_summaries(
213 &mut self,
214 partition_type: &StructType,
215 ) -> Result<Vec<FieldSummary>> {
216 let mut field_stats: Vec<_> = partition_type
217 .fields()
218 .iter()
219 .map(|f| PartitionFieldStats::new(f.field_type.as_primitive_type().unwrap().clone()))
220 .collect();
221 for partition in self.manifest_entries.iter().map(|e| &e.data_file.partition) {
222 for (literal, stat) in partition.iter().zip_eq(field_stats.iter_mut()) {
223 let primitive_literal = literal.map(|v| v.as_primitive_literal().unwrap());
224 stat.update(primitive_literal)?;
225 }
226 }
227 Ok(field_stats.into_iter().map(|stat| stat.finish()).collect())
228 }
229
230 fn check_data_file(&self, data_file: &DataFile) -> Result<()> {
231 match self.metadata.content {
232 ManifestContentType::Data => {
233 if data_file.content != DataContentType::Data {
234 return Err(Error::new(
235 ErrorKind::DataInvalid,
236 format!(
237 "Date file at path {} with manifest content type `data`, should have DataContentType `Data`, but has `{:?}`",
238 data_file.file_path(),
239 data_file.content
240 ),
241 ));
242 }
243 }
244 ManifestContentType::Deletes => {
245 if data_file.content != DataContentType::EqualityDeletes
246 && data_file.content != DataContentType::PositionDeletes
247 {
248 return Err(Error::new(
249 ErrorKind::DataInvalid,
250 format!(
251 "Date file at path {} with manifest content type `deletes`, should have DataContentType `Data`, but has `{:?}`",
252 data_file.file_path(),
253 data_file.content
254 ),
255 ));
256 }
257 }
258 }
259 Ok(())
260 }
261
262 pub(crate) fn add_entry(&mut self, mut entry: ManifestEntry) -> Result<()> {
268 self.check_data_file(&entry.data_file)?;
269 if entry.sequence_number().is_some_and(|n| n >= 0) {
270 entry.status = ManifestStatus::Added;
271 entry.snapshot_id = self.snapshot_id;
272 entry.file_sequence_number = None;
273 } else {
274 entry.status = ManifestStatus::Added;
275 entry.snapshot_id = self.snapshot_id;
276 entry.sequence_number = None;
277 entry.file_sequence_number = None;
278 };
279 self.add_entry_inner(entry)?;
280 Ok(())
281 }
282
283 pub fn add_file(&mut self, data_file: DataFile, sequence_number: i64) -> Result<()> {
287 self.check_data_file(&data_file)?;
288 let entry = ManifestEntry {
289 status: ManifestStatus::Added,
290 snapshot_id: self.snapshot_id,
291 sequence_number: (sequence_number >= 0).then_some(sequence_number),
292 file_sequence_number: None,
293 data_file,
294 };
295 self.add_entry_inner(entry)?;
296 Ok(())
297 }
298
299 #[allow(dead_code)]
306 pub(crate) fn add_delete_entry(&mut self, mut entry: ManifestEntry) -> Result<()> {
307 self.check_data_file(&entry.data_file)?;
308 entry.status = ManifestStatus::Deleted;
309 entry.snapshot_id = self.snapshot_id;
310 self.add_entry_inner(entry)?;
311 Ok(())
312 }
313
314 pub fn add_delete_file(
318 &mut self,
319 data_file: DataFile,
320 sequence_number: i64,
321 file_sequence_number: Option<i64>,
322 ) -> Result<()> {
323 self.check_data_file(&data_file)?;
324 let entry = ManifestEntry {
325 status: ManifestStatus::Deleted,
326 snapshot_id: self.snapshot_id,
327 sequence_number: Some(sequence_number),
328 file_sequence_number,
329 data_file,
330 };
331 self.add_entry_inner(entry)?;
332 Ok(())
333 }
334
335 #[allow(dead_code)]
341 pub(crate) fn add_existing_entry(&mut self, mut entry: ManifestEntry) -> Result<()> {
342 self.check_data_file(&entry.data_file)?;
343 entry.status = ManifestStatus::Existing;
344 self.add_entry_inner(entry)?;
345 Ok(())
346 }
347
348 pub fn add_existing_file(
351 &mut self,
352 data_file: DataFile,
353 snapshot_id: i64,
354 sequence_number: i64,
355 file_sequence_number: Option<i64>,
356 ) -> Result<()> {
357 self.check_data_file(&data_file)?;
358 let entry = ManifestEntry {
359 status: ManifestStatus::Existing,
360 snapshot_id: Some(snapshot_id),
361 sequence_number: Some(sequence_number),
362 file_sequence_number,
363 data_file,
364 };
365 self.add_entry_inner(entry)?;
366 Ok(())
367 }
368
369 fn add_entry_inner(&mut self, entry: ManifestEntry) -> Result<()> {
370 if (entry.status == ManifestStatus::Deleted || entry.status == ManifestStatus::Existing)
372 && (entry.sequence_number.is_none() || entry.file_sequence_number.is_none())
373 {
374 return Err(Error::new(
375 ErrorKind::DataInvalid,
376 "Manifest entry with status Existing or Deleted should have sequence number",
377 ));
378 }
379
380 match entry.status {
382 ManifestStatus::Added => {
383 self.added_files += 1;
384 self.added_rows += entry.data_file.record_count;
385 }
386 ManifestStatus::Deleted => {
387 self.deleted_files += 1;
388 self.deleted_rows += entry.data_file.record_count;
389 }
390 ManifestStatus::Existing => {
391 self.existing_files += 1;
392 self.existing_rows += entry.data_file.record_count;
393 }
394 }
395 if entry.is_alive()
396 && let Some(seq_num) = entry.sequence_number
397 {
398 self.min_seq_num = Some(self.min_seq_num.map_or(seq_num, |v| min(v, seq_num)));
399 }
400 self.manifest_entries.push(entry);
401 Ok(())
402 }
403
404 pub async fn write_manifest_file(mut self) -> Result<ManifestFile> {
406 let partition_type = self
408 .metadata
409 .partition_spec
410 .partition_type(&self.metadata.schema)?;
411 let table_schema = &self.metadata.schema;
412 let avro_schema = match self.metadata.format_version {
413 FormatVersion::V1 => manifest_schema_v1(&partition_type)?,
414 FormatVersion::V2 | FormatVersion::V3 => manifest_schema_v2(&partition_type)?,
416 };
417 let mut avro_writer = AvroWriter::new(&avro_schema, Vec::new());
418 avro_writer.add_user_metadata(
419 "schema".to_string(),
420 to_vec(table_schema).map_err(|err| {
421 Error::new(ErrorKind::DataInvalid, "Fail to serialize table schema")
422 .with_source(err)
423 })?,
424 )?;
425 avro_writer.add_user_metadata(
426 "schema-id".to_string(),
427 table_schema.schema_id().to_string(),
428 )?;
429 avro_writer.add_user_metadata(
430 "partition-spec".to_string(),
431 to_vec(&self.metadata.partition_spec.fields()).map_err(|err| {
432 Error::new(ErrorKind::DataInvalid, "Fail to serialize partition spec")
433 .with_source(err)
434 })?,
435 )?;
436 avro_writer.add_user_metadata(
437 "partition-spec-id".to_string(),
438 self.metadata.partition_spec.spec_id().to_string(),
439 )?;
440 avro_writer.add_user_metadata(
441 "format-version".to_string(),
442 (self.metadata.format_version as u8).to_string(),
443 )?;
444 match self.metadata.format_version {
445 FormatVersion::V1 => {}
446 FormatVersion::V2 | FormatVersion::V3 => {
447 avro_writer
448 .add_user_metadata("content".to_string(), self.metadata.content.to_string())?;
449 }
450 }
451
452 let partition_summary = self.construct_partition_summaries(&partition_type)?;
453 for entry in std::mem::take(&mut self.manifest_entries) {
455 let value = match self.metadata.format_version {
456 FormatVersion::V1 => to_value(ManifestEntryV1::try_from(entry, &partition_type)?)?
457 .resolve(&avro_schema)?,
458 FormatVersion::V2 | FormatVersion::V3 => {
460 to_value(ManifestEntryV2::try_from(entry, &partition_type)?)?
461 .resolve(&avro_schema)?
462 }
463 };
464
465 avro_writer.append(value)?;
466 }
467
468 let content = avro_writer.into_inner()?;
469 let length = content.len();
470 self.output.write(Bytes::from(content)).await?;
471
472 Ok(ManifestFile {
473 manifest_path: self.output.location().to_string(),
474 manifest_length: length as i64,
475 partition_spec_id: self.metadata.partition_spec.spec_id(),
476 content: self.metadata.content,
477 sequence_number: UNASSIGNED_SEQUENCE_NUMBER,
480 min_sequence_number: self.min_seq_num.unwrap_or(UNASSIGNED_SEQUENCE_NUMBER),
481 added_snapshot_id: self.snapshot_id.unwrap_or(UNASSIGNED_SNAPSHOT_ID),
482 added_files_count: Some(self.added_files),
483 existing_files_count: Some(self.existing_files),
484 deleted_files_count: Some(self.deleted_files),
485 added_rows_count: Some(self.added_rows),
486 existing_rows_count: Some(self.existing_rows),
487 deleted_rows_count: Some(self.deleted_rows),
488 partitions: Some(partition_summary),
489 key_metadata: self.key_metadata,
490 first_row_id: self.first_row_id,
491 })
492 }
493}
494
495struct PartitionFieldStats {
496 partition_type: PrimitiveType,
497
498 contains_null: bool,
499 contains_nan: Option<bool>,
500 lower_bound: Option<Datum>,
501 upper_bound: Option<Datum>,
502}
503
504impl PartitionFieldStats {
505 pub(crate) fn new(partition_type: PrimitiveType) -> Self {
506 Self {
507 partition_type,
508 contains_null: false,
509 contains_nan: Some(false),
510 upper_bound: None,
511 lower_bound: None,
512 }
513 }
514
515 pub(crate) fn update(&mut self, value: Option<PrimitiveLiteral>) -> Result<()> {
516 let Some(value) = value else {
517 self.contains_null = true;
518 return Ok(());
519 };
520 if !self.partition_type.compatible(&value) {
521 return Err(Error::new(
522 ErrorKind::DataInvalid,
523 "value is not compatible with type",
524 ));
525 }
526 let value = Datum::new(self.partition_type.clone(), value);
527
528 if value.is_nan() {
529 self.contains_nan = Some(true);
530 return Ok(());
531 }
532
533 self.lower_bound = Some(self.lower_bound.take().map_or(value.clone(), |original| {
534 if value < original {
535 value.clone()
536 } else {
537 original
538 }
539 }));
540 self.upper_bound = Some(self.upper_bound.take().map_or(value.clone(), |original| {
541 if value > original { value } else { original }
542 }));
543
544 Ok(())
545 }
546
547 pub(crate) fn finish(self) -> FieldSummary {
548 FieldSummary {
549 contains_null: self.contains_null,
550 contains_nan: self.contains_nan,
551 upper_bound: self.upper_bound.map(|v| v.to_bytes().unwrap()),
552 lower_bound: self.lower_bound.map(|v| v.to_bytes().unwrap()),
553 }
554 }
555}
556
557#[cfg(test)]
558mod tests {
559 use std::collections::HashMap;
560 use std::fs;
561 use std::sync::Arc;
562
563 use tempfile::TempDir;
564
565 use super::*;
566 use crate::io::FileIO;
567 use crate::spec::{DataFileFormat, Manifest, NestedField, PrimitiveType, Schema, Struct, Type};
568
569 #[tokio::test]
570 async fn test_add_delete_existing() {
571 let schema = Arc::new(
572 Schema::builder()
573 .with_fields(vec![
574 Arc::new(NestedField::optional(
575 1,
576 "id",
577 Type::Primitive(PrimitiveType::Int),
578 )),
579 Arc::new(NestedField::optional(
580 2,
581 "name",
582 Type::Primitive(PrimitiveType::String),
583 )),
584 ])
585 .build()
586 .unwrap(),
587 );
588 let metadata = ManifestMetadata {
589 schema_id: 0,
590 schema: schema.clone(),
591 partition_spec: PartitionSpec::builder(schema)
592 .with_spec_id(0)
593 .build()
594 .unwrap(),
595 content: ManifestContentType::Data,
596 format_version: FormatVersion::V2,
597 };
598 let mut entries = vec![
599 ManifestEntry {
600 status: ManifestStatus::Added,
601 snapshot_id: None,
602 sequence_number: Some(1),
603 file_sequence_number: Some(1),
604 data_file: DataFile {
605 content: DataContentType::Data,
606 file_path: "s3a://icebergdata/demo/s1/t1/data/00000-0-ba56fbfa-f2ff-40c9-bb27-565ad6dc2be8-00000.parquet".to_string(),
607 file_format: DataFileFormat::Parquet,
608 partition: Struct::empty(),
609 record_count: 1,
610 file_size_in_bytes: 5442,
611 column_sizes: HashMap::from([(1, 61), (2, 73)]),
612 value_counts: HashMap::from([(1, 1), (2, 1)]),
613 null_value_counts: HashMap::from([(1, 0), (2, 0)]),
614 nan_value_counts: HashMap::new(),
615 lower_bounds: HashMap::new(),
616 upper_bounds: HashMap::new(),
617 key_metadata: Some(Vec::new()),
618 split_offsets: Some(vec![4]),
619 equality_ids: None,
620 sort_order_id: None,
621 partition_spec_id: 0,
622 first_row_id: None,
623 referenced_data_file: None,
624 content_offset: None,
625 content_size_in_bytes: None,
626 },
627 },
628 ManifestEntry {
629 status: ManifestStatus::Deleted,
630 snapshot_id: Some(1),
631 sequence_number: Some(1),
632 file_sequence_number: Some(1),
633 data_file: DataFile {
634 content: DataContentType::Data,
635 file_path: "s3a://icebergdata/demo/s1/t1/data/00000-0-ba56fbfa-f2ff-40c9-bb27-565ad6dc2be8-00000.parquet".to_string(),
636 file_format: DataFileFormat::Parquet,
637 partition: Struct::empty(),
638 record_count: 1,
639 file_size_in_bytes: 5442,
640 column_sizes: HashMap::from([(1, 61), (2, 73)]),
641 value_counts: HashMap::from([(1, 1), (2, 1)]),
642 null_value_counts: HashMap::from([(1, 0), (2, 0)]),
643 nan_value_counts: HashMap::new(),
644 lower_bounds: HashMap::new(),
645 upper_bounds: HashMap::new(),
646 key_metadata: Some(Vec::new()),
647 split_offsets: Some(vec![4]),
648 equality_ids: None,
649 sort_order_id: None,
650 partition_spec_id: 0,
651 first_row_id: None,
652 referenced_data_file: None,
653 content_offset: None,
654 content_size_in_bytes: None,
655 },
656 },
657 ManifestEntry {
658 status: ManifestStatus::Existing,
659 snapshot_id: Some(1),
660 sequence_number: Some(1),
661 file_sequence_number: Some(1),
662 data_file: DataFile {
663 content: DataContentType::Data,
664 file_path: "s3a://icebergdata/demo/s1/t1/data/00000-0-ba56fbfa-f2ff-40c9-bb27-565ad6dc2be8-00000.parquet".to_string(),
665 file_format: DataFileFormat::Parquet,
666 partition: Struct::empty(),
667 record_count: 1,
668 file_size_in_bytes: 5442,
669 column_sizes: HashMap::from([(1, 61), (2, 73)]),
670 value_counts: HashMap::from([(1, 1), (2, 1)]),
671 null_value_counts: HashMap::from([(1, 0), (2, 0)]),
672 nan_value_counts: HashMap::new(),
673 lower_bounds: HashMap::new(),
674 upper_bounds: HashMap::new(),
675 key_metadata: Some(Vec::new()),
676 split_offsets: Some(vec![4]),
677 equality_ids: None,
678 sort_order_id: None,
679 partition_spec_id: 0,
680 first_row_id: None,
681 referenced_data_file: None,
682 content_offset: None,
683 content_size_in_bytes: None,
684 },
685 },
686 ];
687
688 let tmp_dir = TempDir::new().unwrap();
690 let path = tmp_dir.path().join("test_manifest.avro");
691 let io = FileIO::new_with_fs();
692 let output_file = io.new_output(path.to_str().unwrap()).unwrap();
693 let mut writer = ManifestWriterBuilder::new(
694 output_file,
695 Some(3),
696 None,
697 metadata.schema.clone(),
698 metadata.partition_spec.clone(),
699 )
700 .build_v2_data();
701 writer.add_entry(entries[0].clone()).unwrap();
702 writer.add_delete_entry(entries[1].clone()).unwrap();
703 writer.add_existing_entry(entries[2].clone()).unwrap();
704 writer.write_manifest_file().await.unwrap();
705
706 let actual_manifest =
708 Manifest::parse_avro(fs::read(path).expect("read_file must succeed").as_slice())
709 .unwrap();
710
711 entries[0].snapshot_id = Some(3);
713 entries[1].snapshot_id = Some(3);
714 entries[0].file_sequence_number = None;
716 assert_eq!(actual_manifest, Manifest::new(metadata, entries));
717 }
718
719 #[tokio::test]
720 async fn test_v3_delete_manifest_delete_file_roundtrip() {
721 let schema = Arc::new(
722 Schema::builder()
723 .with_fields(vec![
724 Arc::new(NestedField::optional(
725 1,
726 "id",
727 Type::Primitive(PrimitiveType::Long),
728 )),
729 Arc::new(NestedField::optional(
730 2,
731 "data",
732 Type::Primitive(PrimitiveType::String),
733 )),
734 ])
735 .build()
736 .unwrap(),
737 );
738
739 let partition_spec = PartitionSpec::builder(schema.clone())
740 .with_spec_id(0)
741 .build()
742 .unwrap();
743
744 let delete_entry = ManifestEntry {
746 status: ManifestStatus::Added,
747 snapshot_id: None,
748 sequence_number: None,
749 file_sequence_number: None,
750 data_file: DataFile {
751 content: DataContentType::PositionDeletes,
752 file_path: "s3://bucket/table/data/delete-00000.parquet".to_string(),
753 file_format: DataFileFormat::Parquet,
754 partition: Struct::empty(),
755 record_count: 10,
756 file_size_in_bytes: 1024,
757 column_sizes: HashMap::new(),
758 value_counts: HashMap::new(),
759 null_value_counts: HashMap::new(),
760 nan_value_counts: HashMap::new(),
761 lower_bounds: HashMap::new(),
762 upper_bounds: HashMap::new(),
763 key_metadata: None,
764 split_offsets: None,
765 equality_ids: None,
766 sort_order_id: None,
767 partition_spec_id: 0,
768 first_row_id: None,
769 referenced_data_file: None,
770 content_offset: None,
771 content_size_in_bytes: None,
772 },
773 };
774
775 let tmp_dir = TempDir::new().unwrap();
777 let path = tmp_dir.path().join("v3_delete_manifest.avro");
778 let io = FileIO::new_with_fs();
779 let output_file = io.new_output(path.to_str().unwrap()).unwrap();
780
781 let mut writer = ManifestWriterBuilder::new(
782 output_file,
783 Some(1),
784 None,
785 schema.clone(),
786 partition_spec.clone(),
787 )
788 .build_v3_deletes();
789
790 writer.add_entry(delete_entry).unwrap();
791 let manifest_file = writer.write_manifest_file().await.unwrap();
792
793 assert_eq!(manifest_file.content, ManifestContentType::Deletes);
795
796 let actual_manifest =
798 Manifest::parse_avro(fs::read(&path).expect("read_file must succeed").as_slice())
799 .unwrap();
800
801 assert_eq!(
803 actual_manifest.metadata().content,
804 ManifestContentType::Deletes,
805 );
806 }
807}