1use std::cmp::min;
19use std::future::Future;
20use std::pin::Pin;
21
22use apache_avro::{Writer as AvroWriter, to_value};
23use bytes::Bytes;
24use itertools::Itertools;
25use serde_json::to_vec;
26
27use super::{
28 Datum, FormatVersion, ManifestContentType, PartitionSpec, PrimitiveType,
29 UNASSIGNED_SEQUENCE_NUMBER,
30};
31use crate::encryption::EncryptedOutputFile;
32use crate::error::Result;
33use crate::io::{FileWrite, OutputFile};
34use crate::spec::manifest::_serde::{ManifestEntryV1, ManifestEntryV2};
35use crate::spec::manifest::{manifest_schema_v1, manifest_schema_v2};
36use crate::spec::{
37 DataContentType, DataFile, FieldSummary, ManifestEntry, ManifestFile, ManifestMetadata,
38 ManifestStatus, PrimitiveLiteral, SchemaRef, StructType,
39};
40use crate::{Error, ErrorKind};
41
42const UNASSIGNED_SNAPSHOT_ID: i64 = -1;
45
46type WriterFuture = Pin<Box<dyn Future<Output = Result<Box<dyn FileWrite>>> + Send>>;
47
48pub struct ManifestWriterBuilder {
50 writer_future: WriterFuture,
51 location: String,
52 snapshot_id: Option<i64>,
53 key_metadata: Option<Vec<u8>>,
54 schema: SchemaRef,
55 partition_spec: PartitionSpec,
56}
57
58impl ManifestWriterBuilder {
59 pub fn new(
61 output: OutputFile,
62 snapshot_id: Option<i64>,
63 key_metadata: Option<Vec<u8>>,
64 schema: SchemaRef,
65 partition_spec: PartitionSpec,
66 ) -> Self {
67 let location = output.location().to_owned();
68 Self {
69 writer_future: Box::pin(async move { output.writer().await }),
70 location,
71 snapshot_id,
72 key_metadata,
73 schema,
74 partition_spec,
75 }
76 }
77
78 pub fn new_from_encrypted(
82 encrypted_output: EncryptedOutputFile,
83 snapshot_id: Option<i64>,
84 key_metadata: Option<Vec<u8>>,
85 schema: SchemaRef,
86 partition_spec: PartitionSpec,
87 ) -> Self {
88 let location = encrypted_output.location().to_owned();
89 Self {
90 writer_future: Box::pin(async move { encrypted_output.writer().await }),
91 location,
92 snapshot_id,
93 key_metadata,
94 schema,
95 partition_spec,
96 }
97 }
98
99 pub fn build_v1(self) -> ManifestWriter {
101 let metadata = ManifestMetadata::builder()
102 .schema_id(self.schema.schema_id())
103 .schema(self.schema)
104 .partition_spec(self.partition_spec)
105 .format_version(FormatVersion::V1)
106 .content(ManifestContentType::Data)
107 .build();
108 ManifestWriter::new(
109 self.writer_future,
110 self.location,
111 self.snapshot_id,
112 self.key_metadata,
113 metadata,
114 None,
115 )
116 }
117
118 pub fn build_v2_data(self) -> ManifestWriter {
120 let metadata = ManifestMetadata::builder()
121 .schema_id(self.schema.schema_id())
122 .schema(self.schema)
123 .partition_spec(self.partition_spec)
124 .format_version(FormatVersion::V2)
125 .content(ManifestContentType::Data)
126 .build();
127 ManifestWriter::new(
128 self.writer_future,
129 self.location,
130 self.snapshot_id,
131 self.key_metadata,
132 metadata,
133 None,
134 )
135 }
136
137 pub fn build_v2_deletes(self) -> ManifestWriter {
139 let metadata = ManifestMetadata::builder()
140 .schema_id(self.schema.schema_id())
141 .schema(self.schema)
142 .partition_spec(self.partition_spec)
143 .format_version(FormatVersion::V2)
144 .content(ManifestContentType::Deletes)
145 .build();
146 ManifestWriter::new(
147 self.writer_future,
148 self.location,
149 self.snapshot_id,
150 self.key_metadata,
151 metadata,
152 None,
153 )
154 }
155
156 pub fn build_v3_data(self) -> ManifestWriter {
158 let metadata = ManifestMetadata::builder()
159 .schema_id(self.schema.schema_id())
160 .schema(self.schema)
161 .partition_spec(self.partition_spec)
162 .format_version(FormatVersion::V3)
163 .content(ManifestContentType::Data)
164 .build();
165 ManifestWriter::new(
166 self.writer_future,
167 self.location,
168 self.snapshot_id,
169 self.key_metadata,
170 metadata,
171 None,
174 )
175 }
176
177 pub fn build_v3_deletes(self) -> ManifestWriter {
179 let metadata = ManifestMetadata::builder()
180 .schema_id(self.schema.schema_id())
181 .schema(self.schema)
182 .partition_spec(self.partition_spec)
183 .format_version(FormatVersion::V3)
184 .content(ManifestContentType::Deletes)
185 .build();
186 ManifestWriter::new(
187 self.writer_future,
188 self.location,
189 self.snapshot_id,
190 self.key_metadata,
191 metadata,
192 None,
193 )
194 }
195}
196
197pub struct ManifestWriter {
199 writer_future: WriterFuture,
200 location: String,
201
202 snapshot_id: Option<i64>,
203
204 added_files: u32,
205 added_rows: u64,
206 existing_files: u32,
207 existing_rows: u64,
208 deleted_files: u32,
209 deleted_rows: u64,
210 first_row_id: Option<u64>,
211
212 min_seq_num: Option<i64>,
213
214 key_metadata: Option<Vec<u8>>,
215
216 manifest_entries: Vec<ManifestEntry>,
217
218 metadata: ManifestMetadata,
219}
220
221impl ManifestWriter {
222 pub(crate) fn new(
224 writer_future: WriterFuture,
225 location: String,
226 snapshot_id: Option<i64>,
227 key_metadata: Option<Vec<u8>>,
228 metadata: ManifestMetadata,
229 first_row_id: Option<u64>,
230 ) -> Self {
231 Self {
232 writer_future,
233 location,
234 snapshot_id,
235 added_files: 0,
236 added_rows: 0,
237 existing_files: 0,
238 existing_rows: 0,
239 deleted_files: 0,
240 deleted_rows: 0,
241 first_row_id,
242 min_seq_num: None,
243 key_metadata,
244 manifest_entries: Vec::new(),
245 metadata,
246 }
247 }
248
249 fn construct_partition_summaries(
250 &mut self,
251 partition_type: &StructType,
252 ) -> Result<Vec<FieldSummary>> {
253 let mut field_stats: Vec<_> = partition_type
254 .fields()
255 .iter()
256 .map(|f| PartitionFieldStats::new(f.field_type.as_primitive_type().unwrap().clone()))
257 .collect();
258 for partition in self.manifest_entries.iter().map(|e| &e.data_file.partition) {
259 for (literal, stat) in partition.iter().zip_eq(field_stats.iter_mut()) {
260 let primitive_literal = literal.map(|v| v.as_primitive_literal().unwrap());
261 stat.update(primitive_literal)?;
262 }
263 }
264 Ok(field_stats.into_iter().map(|stat| stat.finish()).collect())
265 }
266
267 fn check_data_file(&self, data_file: &DataFile) -> Result<()> {
268 match self.metadata.content {
269 ManifestContentType::Data => {
270 if data_file.content != DataContentType::Data {
271 return Err(Error::new(
272 ErrorKind::DataInvalid,
273 format!(
274 "Date file at path {} with manifest content type `data`, should have DataContentType `Data`, but has `{:?}`",
275 data_file.file_path(),
276 data_file.content
277 ),
278 ));
279 }
280 }
281 ManifestContentType::Deletes => {
282 if data_file.content != DataContentType::EqualityDeletes
283 && data_file.content != DataContentType::PositionDeletes
284 {
285 return Err(Error::new(
286 ErrorKind::DataInvalid,
287 format!(
288 "Date file at path {} with manifest content type `deletes`, should have DataContentType `Data`, but has `{:?}`",
289 data_file.file_path(),
290 data_file.content
291 ),
292 ));
293 }
294 }
295 }
296 Ok(())
297 }
298
299 pub(crate) fn add_entry(&mut self, mut entry: ManifestEntry) -> Result<()> {
305 self.check_data_file(&entry.data_file)?;
306 if entry.sequence_number().is_some_and(|n| n >= 0) {
307 entry.status = ManifestStatus::Added;
308 entry.snapshot_id = self.snapshot_id;
309 entry.file_sequence_number = None;
310 } else {
311 entry.status = ManifestStatus::Added;
312 entry.snapshot_id = self.snapshot_id;
313 entry.sequence_number = None;
314 entry.file_sequence_number = None;
315 };
316 self.add_entry_inner(entry)?;
317 Ok(())
318 }
319
320 pub fn add_file(&mut self, data_file: DataFile, sequence_number: i64) -> Result<()> {
324 self.check_data_file(&data_file)?;
325 let entry = ManifestEntry {
326 status: ManifestStatus::Added,
327 snapshot_id: self.snapshot_id,
328 sequence_number: (sequence_number >= 0).then_some(sequence_number),
329 file_sequence_number: None,
330 data_file,
331 };
332 self.add_entry_inner(entry)?;
333 Ok(())
334 }
335
336 #[allow(dead_code)]
343 pub(crate) fn add_delete_entry(&mut self, mut entry: ManifestEntry) -> Result<()> {
344 self.check_data_file(&entry.data_file)?;
345 entry.status = ManifestStatus::Deleted;
346 entry.snapshot_id = self.snapshot_id;
347 self.add_entry_inner(entry)?;
348 Ok(())
349 }
350
351 pub fn add_delete_file(
355 &mut self,
356 data_file: DataFile,
357 sequence_number: i64,
358 file_sequence_number: Option<i64>,
359 ) -> Result<()> {
360 self.check_data_file(&data_file)?;
361 let entry = ManifestEntry {
362 status: ManifestStatus::Deleted,
363 snapshot_id: self.snapshot_id,
364 sequence_number: Some(sequence_number),
365 file_sequence_number,
366 data_file,
367 };
368 self.add_entry_inner(entry)?;
369 Ok(())
370 }
371
372 #[allow(dead_code)]
378 pub(crate) fn add_existing_entry(&mut self, mut entry: ManifestEntry) -> Result<()> {
379 self.check_data_file(&entry.data_file)?;
380 entry.status = ManifestStatus::Existing;
381 self.add_entry_inner(entry)?;
382 Ok(())
383 }
384
385 pub fn add_existing_file(
388 &mut self,
389 data_file: DataFile,
390 snapshot_id: i64,
391 sequence_number: i64,
392 file_sequence_number: Option<i64>,
393 ) -> Result<()> {
394 self.check_data_file(&data_file)?;
395 let entry = ManifestEntry {
396 status: ManifestStatus::Existing,
397 snapshot_id: Some(snapshot_id),
398 sequence_number: Some(sequence_number),
399 file_sequence_number,
400 data_file,
401 };
402 self.add_entry_inner(entry)?;
403 Ok(())
404 }
405
406 fn add_entry_inner(&mut self, entry: ManifestEntry) -> Result<()> {
407 if (entry.status == ManifestStatus::Deleted || entry.status == ManifestStatus::Existing)
409 && (entry.sequence_number.is_none() || entry.file_sequence_number.is_none())
410 {
411 return Err(Error::new(
412 ErrorKind::DataInvalid,
413 "Manifest entry with status Existing or Deleted should have sequence number",
414 ));
415 }
416
417 match entry.status {
419 ManifestStatus::Added => {
420 self.added_files += 1;
421 self.added_rows += entry.data_file.record_count;
422 }
423 ManifestStatus::Deleted => {
424 self.deleted_files += 1;
425 self.deleted_rows += entry.data_file.record_count;
426 }
427 ManifestStatus::Existing => {
428 self.existing_files += 1;
429 self.existing_rows += entry.data_file.record_count;
430 }
431 }
432 if entry.is_alive()
433 && let Some(seq_num) = entry.sequence_number
434 {
435 self.min_seq_num = Some(self.min_seq_num.map_or(seq_num, |v| min(v, seq_num)));
436 }
437 self.manifest_entries.push(entry);
438 Ok(())
439 }
440
441 pub async fn write_manifest_file(mut self) -> Result<ManifestFile> {
443 let partition_type = self
445 .metadata
446 .partition_spec
447 .partition_type(&self.metadata.schema)?;
448 let table_schema = &self.metadata.schema;
449 let avro_schema = match self.metadata.format_version {
450 FormatVersion::V1 => manifest_schema_v1(&partition_type)?,
451 FormatVersion::V2 | FormatVersion::V3 => manifest_schema_v2(&partition_type)?,
453 };
454 let mut avro_writer = AvroWriter::new(&avro_schema, Vec::new());
455 avro_writer.add_user_metadata(
456 "schema".to_string(),
457 to_vec(table_schema).map_err(|err| {
458 Error::new(ErrorKind::DataInvalid, "Fail to serialize table schema")
459 .with_source(err)
460 })?,
461 )?;
462 avro_writer.add_user_metadata(
463 "schema-id".to_string(),
464 table_schema.schema_id().to_string(),
465 )?;
466 avro_writer.add_user_metadata(
467 "partition-spec".to_string(),
468 to_vec(&self.metadata.partition_spec.fields()).map_err(|err| {
469 Error::new(ErrorKind::DataInvalid, "Fail to serialize partition spec")
470 .with_source(err)
471 })?,
472 )?;
473 avro_writer.add_user_metadata(
474 "partition-spec-id".to_string(),
475 self.metadata.partition_spec.spec_id().to_string(),
476 )?;
477 avro_writer.add_user_metadata(
478 "format-version".to_string(),
479 (self.metadata.format_version as u8).to_string(),
480 )?;
481 match self.metadata.format_version {
482 FormatVersion::V1 => {}
483 FormatVersion::V2 | FormatVersion::V3 => {
484 avro_writer
485 .add_user_metadata("content".to_string(), self.metadata.content.to_string())?;
486 }
487 }
488
489 let partition_summary = self.construct_partition_summaries(&partition_type)?;
490 for entry in std::mem::take(&mut self.manifest_entries) {
492 let value = match self.metadata.format_version {
493 FormatVersion::V1 => to_value(ManifestEntryV1::try_from(entry, &partition_type)?)?
494 .resolve(&avro_schema)?,
495 FormatVersion::V2 | FormatVersion::V3 => {
497 to_value(ManifestEntryV2::try_from(entry, &partition_type)?)?
498 .resolve(&avro_schema)?
499 }
500 };
501
502 avro_writer.append(value)?;
503 }
504
505 let content = avro_writer.into_inner()?;
506 let length = content.len();
507 let mut writer = self.writer_future.await?;
508 writer.write(Bytes::from(content)).await?;
509 writer.close().await?;
510
511 Ok(ManifestFile {
512 manifest_path: self.location,
513 manifest_length: length as i64,
514 partition_spec_id: self.metadata.partition_spec.spec_id(),
515 content: self.metadata.content,
516 sequence_number: UNASSIGNED_SEQUENCE_NUMBER,
519 min_sequence_number: self.min_seq_num.unwrap_or(UNASSIGNED_SEQUENCE_NUMBER),
520 added_snapshot_id: self.snapshot_id.unwrap_or(UNASSIGNED_SNAPSHOT_ID),
521 added_files_count: Some(self.added_files),
522 existing_files_count: Some(self.existing_files),
523 deleted_files_count: Some(self.deleted_files),
524 added_rows_count: Some(self.added_rows),
525 existing_rows_count: Some(self.existing_rows),
526 deleted_rows_count: Some(self.deleted_rows),
527 partitions: Some(partition_summary),
528 key_metadata: self.key_metadata,
529 first_row_id: self.first_row_id,
530 })
531 }
532}
533
534struct PartitionFieldStats {
535 partition_type: PrimitiveType,
536
537 contains_null: bool,
538 contains_nan: Option<bool>,
539 lower_bound: Option<Datum>,
540 upper_bound: Option<Datum>,
541}
542
543impl PartitionFieldStats {
544 pub(crate) fn new(partition_type: PrimitiveType) -> Self {
545 Self {
546 partition_type,
547 contains_null: false,
548 contains_nan: Some(false),
549 upper_bound: None,
550 lower_bound: None,
551 }
552 }
553
554 pub(crate) fn update(&mut self, value: Option<PrimitiveLiteral>) -> Result<()> {
555 let Some(value) = value else {
556 self.contains_null = true;
557 return Ok(());
558 };
559 if !self.partition_type.compatible(&value) {
560 return Err(Error::new(
561 ErrorKind::DataInvalid,
562 "value is not compatible with type",
563 ));
564 }
565 let value = Datum::new(self.partition_type.clone(), value);
566
567 if value.is_nan() {
568 self.contains_nan = Some(true);
569 return Ok(());
570 }
571
572 self.lower_bound = Some(self.lower_bound.take().map_or(value.clone(), |original| {
573 if value < original {
574 value.clone()
575 } else {
576 original
577 }
578 }));
579 self.upper_bound = Some(self.upper_bound.take().map_or(value.clone(), |original| {
580 if value > original { value } else { original }
581 }));
582
583 Ok(())
584 }
585
586 pub(crate) fn finish(self) -> FieldSummary {
587 FieldSummary {
588 contains_null: self.contains_null,
589 contains_nan: self.contains_nan,
590 upper_bound: self.upper_bound.map(|v| v.to_bytes().unwrap()),
591 lower_bound: self.lower_bound.map(|v| v.to_bytes().unwrap()),
592 }
593 }
594}
595
596#[cfg(test)]
597mod tests {
598 use std::collections::HashMap;
599 use std::fs;
600 use std::sync::Arc;
601
602 use tempfile::TempDir;
603
604 use super::*;
605 use crate::io::FileIO;
606 use crate::spec::{DataFileFormat, Manifest, NestedField, PrimitiveType, Schema, Struct, Type};
607
608 #[tokio::test]
609 async fn test_add_delete_existing() {
610 let schema = Arc::new(
611 Schema::builder()
612 .with_fields(vec![
613 Arc::new(NestedField::optional(
614 1,
615 "id",
616 Type::Primitive(PrimitiveType::Int),
617 )),
618 Arc::new(NestedField::optional(
619 2,
620 "name",
621 Type::Primitive(PrimitiveType::String),
622 )),
623 ])
624 .build()
625 .unwrap(),
626 );
627 let metadata = ManifestMetadata {
628 schema_id: 0,
629 schema: schema.clone(),
630 partition_spec: PartitionSpec::builder(schema)
631 .with_spec_id(0)
632 .build()
633 .unwrap(),
634 content: ManifestContentType::Data,
635 format_version: FormatVersion::V2,
636 };
637 let mut entries = vec![
638 ManifestEntry {
639 status: ManifestStatus::Added,
640 snapshot_id: None,
641 sequence_number: Some(1),
642 file_sequence_number: Some(1),
643 data_file: DataFile {
644 content: DataContentType::Data,
645 file_path: "s3a://icebergdata/demo/s1/t1/data/00000-0-ba56fbfa-f2ff-40c9-bb27-565ad6dc2be8-00000.parquet".to_string(),
646 file_format: DataFileFormat::Parquet,
647 partition: Struct::empty(),
648 record_count: 1,
649 file_size_in_bytes: 5442,
650 column_sizes: HashMap::from([(1, 61), (2, 73)]),
651 value_counts: HashMap::from([(1, 1), (2, 1)]),
652 null_value_counts: HashMap::from([(1, 0), (2, 0)]),
653 nan_value_counts: HashMap::new(),
654 lower_bounds: HashMap::new(),
655 upper_bounds: HashMap::new(),
656 key_metadata: Some(Vec::new()),
657 split_offsets: Some(vec![4]),
658 equality_ids: None,
659 sort_order_id: None,
660 partition_spec_id: 0,
661 first_row_id: None,
662 referenced_data_file: None,
663 content_offset: None,
664 content_size_in_bytes: None,
665 },
666 },
667 ManifestEntry {
668 status: ManifestStatus::Deleted,
669 snapshot_id: Some(1),
670 sequence_number: Some(1),
671 file_sequence_number: Some(1),
672 data_file: DataFile {
673 content: DataContentType::Data,
674 file_path: "s3a://icebergdata/demo/s1/t1/data/00000-0-ba56fbfa-f2ff-40c9-bb27-565ad6dc2be8-00000.parquet".to_string(),
675 file_format: DataFileFormat::Parquet,
676 partition: Struct::empty(),
677 record_count: 1,
678 file_size_in_bytes: 5442,
679 column_sizes: HashMap::from([(1, 61), (2, 73)]),
680 value_counts: HashMap::from([(1, 1), (2, 1)]),
681 null_value_counts: HashMap::from([(1, 0), (2, 0)]),
682 nan_value_counts: HashMap::new(),
683 lower_bounds: HashMap::new(),
684 upper_bounds: HashMap::new(),
685 key_metadata: Some(Vec::new()),
686 split_offsets: Some(vec![4]),
687 equality_ids: None,
688 sort_order_id: None,
689 partition_spec_id: 0,
690 first_row_id: None,
691 referenced_data_file: None,
692 content_offset: None,
693 content_size_in_bytes: None,
694 },
695 },
696 ManifestEntry {
697 status: ManifestStatus::Existing,
698 snapshot_id: Some(1),
699 sequence_number: Some(1),
700 file_sequence_number: Some(1),
701 data_file: DataFile {
702 content: DataContentType::Data,
703 file_path: "s3a://icebergdata/demo/s1/t1/data/00000-0-ba56fbfa-f2ff-40c9-bb27-565ad6dc2be8-00000.parquet".to_string(),
704 file_format: DataFileFormat::Parquet,
705 partition: Struct::empty(),
706 record_count: 1,
707 file_size_in_bytes: 5442,
708 column_sizes: HashMap::from([(1, 61), (2, 73)]),
709 value_counts: HashMap::from([(1, 1), (2, 1)]),
710 null_value_counts: HashMap::from([(1, 0), (2, 0)]),
711 nan_value_counts: HashMap::new(),
712 lower_bounds: HashMap::new(),
713 upper_bounds: HashMap::new(),
714 key_metadata: Some(Vec::new()),
715 split_offsets: Some(vec![4]),
716 equality_ids: None,
717 sort_order_id: None,
718 partition_spec_id: 0,
719 first_row_id: None,
720 referenced_data_file: None,
721 content_offset: None,
722 content_size_in_bytes: None,
723 },
724 },
725 ];
726
727 let tmp_dir = TempDir::new().unwrap();
729 let path = tmp_dir.path().join("test_manifest.avro");
730 let io = FileIO::new_with_fs();
731 let output_file = io.new_output(path.to_str().unwrap()).unwrap();
732 let mut writer = ManifestWriterBuilder::new(
733 output_file,
734 Some(3),
735 None,
736 metadata.schema.clone(),
737 metadata.partition_spec.clone(),
738 )
739 .build_v2_data();
740 writer.add_entry(entries[0].clone()).unwrap();
741 writer.add_delete_entry(entries[1].clone()).unwrap();
742 writer.add_existing_entry(entries[2].clone()).unwrap();
743 writer.write_manifest_file().await.unwrap();
744
745 let actual_manifest =
747 Manifest::parse_avro(fs::read(path).expect("read_file must succeed").as_slice())
748 .unwrap();
749
750 entries[0].snapshot_id = Some(3);
752 entries[1].snapshot_id = Some(3);
753 entries[0].file_sequence_number = None;
755 assert_eq!(actual_manifest, Manifest::new(metadata, entries));
756 }
757
758 #[tokio::test]
759 async fn test_v3_delete_manifest_delete_file_roundtrip() {
760 let schema = Arc::new(
761 Schema::builder()
762 .with_fields(vec![
763 Arc::new(NestedField::optional(
764 1,
765 "id",
766 Type::Primitive(PrimitiveType::Long),
767 )),
768 Arc::new(NestedField::optional(
769 2,
770 "data",
771 Type::Primitive(PrimitiveType::String),
772 )),
773 ])
774 .build()
775 .unwrap(),
776 );
777
778 let partition_spec = PartitionSpec::builder(schema.clone())
779 .with_spec_id(0)
780 .build()
781 .unwrap();
782
783 let delete_entry = ManifestEntry {
785 status: ManifestStatus::Added,
786 snapshot_id: None,
787 sequence_number: None,
788 file_sequence_number: None,
789 data_file: DataFile {
790 content: DataContentType::PositionDeletes,
791 file_path: "s3://bucket/table/data/delete-00000.parquet".to_string(),
792 file_format: DataFileFormat::Parquet,
793 partition: Struct::empty(),
794 record_count: 10,
795 file_size_in_bytes: 1024,
796 column_sizes: HashMap::new(),
797 value_counts: HashMap::new(),
798 null_value_counts: HashMap::new(),
799 nan_value_counts: HashMap::new(),
800 lower_bounds: HashMap::new(),
801 upper_bounds: HashMap::new(),
802 key_metadata: None,
803 split_offsets: None,
804 equality_ids: None,
805 sort_order_id: None,
806 partition_spec_id: 0,
807 first_row_id: None,
808 referenced_data_file: None,
809 content_offset: None,
810 content_size_in_bytes: None,
811 },
812 };
813
814 let tmp_dir = TempDir::new().unwrap();
816 let path = tmp_dir.path().join("v3_delete_manifest.avro");
817 let io = FileIO::new_with_fs();
818 let output_file = io.new_output(path.to_str().unwrap()).unwrap();
819
820 let mut writer = ManifestWriterBuilder::new(
821 output_file,
822 Some(1),
823 None,
824 schema.clone(),
825 partition_spec.clone(),
826 )
827 .build_v3_deletes();
828
829 writer.add_entry(delete_entry).unwrap();
830 let manifest_file = writer.write_manifest_file().await.unwrap();
831
832 assert_eq!(manifest_file.content, ManifestContentType::Deletes);
834
835 let actual_manifest =
837 Manifest::parse_avro(fs::read(&path).expect("read_file must succeed").as_slice())
838 .unwrap();
839
840 assert_eq!(
842 actual_manifest.metadata().content,
843 ManifestContentType::Deletes,
844 );
845 }
846}