1use std::collections::HashMap;
21use std::str::FromStr;
22
23use apache_avro::types::Value;
24use apache_avro::{Reader, Writer, from_value};
25use bytes::Bytes;
26pub use serde_bytes::ByteBuf;
27use serde_derive::{Deserialize, Serialize};
28
29use self::_const_schema::{MANIFEST_LIST_AVRO_SCHEMA_V1, MANIFEST_LIST_AVRO_SCHEMA_V2};
30use self::_serde::{ManifestFileV1, ManifestFileV2};
31use super::{FormatVersion, Manifest};
32use crate::error::Result;
33use crate::io::{FileIO, OutputFile};
34use crate::spec::manifest_list::_const_schema::MANIFEST_LIST_AVRO_SCHEMA_V3;
35use crate::spec::manifest_list::_serde::ManifestFileV3;
36use crate::{Error, ErrorKind};
37
38pub const UNASSIGNED_SEQUENCE_NUMBER: i64 = -1;
40
41#[derive(Debug, Clone, PartialEq)]
55pub struct ManifestList {
56 entries: Vec<ManifestFile>,
58}
59
60impl ManifestList {
61 pub fn parse_with_version(bs: &[u8], version: FormatVersion) -> Result<ManifestList> {
63 match version {
64 FormatVersion::V1 => {
65 let reader = Reader::with_schema(&MANIFEST_LIST_AVRO_SCHEMA_V1, bs)?;
66 let values = Value::Array(reader.collect::<std::result::Result<Vec<Value>, _>>()?);
67 from_value::<_serde::ManifestListV1>(&values)?.try_into()
68 }
69 FormatVersion::V2 => {
70 let reader = Reader::new(bs)?;
71 let values = Value::Array(reader.collect::<std::result::Result<Vec<Value>, _>>()?);
72 from_value::<_serde::ManifestListV2>(&values)?.try_into()
73 }
74 FormatVersion::V3 => {
75 let reader = Reader::new(bs)?;
76 let values = Value::Array(reader.collect::<std::result::Result<Vec<Value>, _>>()?);
77 from_value::<_serde::ManifestListV3>(&values)?.try_into()
78 }
79 }
80 }
81
82 pub fn entries(&self) -> &[ManifestFile] {
84 &self.entries
85 }
86
87 pub fn consume_entries(self) -> impl IntoIterator<Item = ManifestFile> {
89 Box::new(self.entries.into_iter())
90 }
91}
92
93pub struct ManifestListWriter {
95 format_version: FormatVersion,
96 output_file: OutputFile,
97 avro_writer: Writer<'static, Vec<u8>>,
98 sequence_number: i64,
99 snapshot_id: i64,
100 next_row_id: Option<u64>,
101}
102
103impl std::fmt::Debug for ManifestListWriter {
104 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
105 f.debug_struct("ManifestListWriter")
106 .field("format_version", &self.format_version)
107 .field("output_file", &self.output_file)
108 .field("avro_writer", &self.avro_writer.schema())
109 .finish_non_exhaustive()
110 }
111}
112
113impl ManifestListWriter {
114 pub fn next_row_id(&self) -> Option<u64> {
116 self.next_row_id
117 }
118
119 pub fn v1(output_file: OutputFile, snapshot_id: i64, parent_snapshot_id: Option<i64>) -> Self {
121 let mut metadata = HashMap::from_iter([
122 ("snapshot-id".to_string(), snapshot_id.to_string()),
123 ("format-version".to_string(), "1".to_string()),
124 ]);
125 if let Some(parent_snapshot_id) = parent_snapshot_id {
126 metadata.insert(
127 "parent-snapshot-id".to_string(),
128 parent_snapshot_id.to_string(),
129 );
130 }
131 Self::new(
132 FormatVersion::V1,
133 output_file,
134 metadata,
135 0,
136 snapshot_id,
137 None,
138 )
139 }
140
141 pub fn v2(
143 output_file: OutputFile,
144 snapshot_id: i64,
145 parent_snapshot_id: Option<i64>,
146 sequence_number: i64,
147 ) -> Self {
148 let mut metadata = HashMap::from_iter([
149 ("snapshot-id".to_string(), snapshot_id.to_string()),
150 ("sequence-number".to_string(), sequence_number.to_string()),
151 ("format-version".to_string(), "2".to_string()),
152 ]);
153 metadata.insert(
154 "parent-snapshot-id".to_string(),
155 parent_snapshot_id
156 .map(|v| v.to_string())
157 .unwrap_or("null".to_string()),
158 );
159 Self::new(
160 FormatVersion::V2,
161 output_file,
162 metadata,
163 sequence_number,
164 snapshot_id,
165 None,
166 )
167 }
168
169 pub fn v3(
171 output_file: OutputFile,
172 snapshot_id: i64,
173 parent_snapshot_id: Option<i64>,
174 sequence_number: i64,
175 first_row_id: Option<u64>, ) -> Self {
177 let mut metadata = HashMap::from_iter([
178 ("snapshot-id".to_string(), snapshot_id.to_string()),
179 ("sequence-number".to_string(), sequence_number.to_string()),
180 ("format-version".to_string(), "3".to_string()),
181 ]);
182 metadata.insert(
183 "parent-snapshot-id".to_string(),
184 parent_snapshot_id
185 .map(|v| v.to_string())
186 .unwrap_or("null".to_string()),
187 );
188 metadata.insert(
189 "first-row-id".to_string(),
190 first_row_id
191 .map(|v| v.to_string())
192 .unwrap_or("null".to_string()),
193 );
194 Self::new(
195 FormatVersion::V3,
196 output_file,
197 metadata,
198 sequence_number,
199 snapshot_id,
200 first_row_id,
201 )
202 }
203
204 fn new(
205 format_version: FormatVersion,
206 output_file: OutputFile,
207 metadata: HashMap<String, String>,
208 sequence_number: i64,
209 snapshot_id: i64,
210 first_row_id: Option<u64>,
211 ) -> Self {
212 let avro_schema = match format_version {
213 FormatVersion::V1 => &MANIFEST_LIST_AVRO_SCHEMA_V1,
214 FormatVersion::V2 => &MANIFEST_LIST_AVRO_SCHEMA_V2,
215 FormatVersion::V3 => &MANIFEST_LIST_AVRO_SCHEMA_V3,
216 };
217 let mut avro_writer = Writer::new(avro_schema, Vec::new());
218 for (key, value) in metadata {
219 avro_writer
220 .add_user_metadata(key, value)
221 .expect("Avro metadata should be added to the writer before the first record.");
222 }
223 Self {
224 format_version,
225 output_file,
226 avro_writer,
227 sequence_number,
228 snapshot_id,
229 next_row_id: first_row_id,
230 }
231 }
232
233 pub fn add_manifests(&mut self, manifests: impl Iterator<Item = ManifestFile>) -> Result<()> {
239 match self.format_version {
240 FormatVersion::V1 => {
241 for manifest in manifests {
242 let manifests: ManifestFileV1 = manifest.try_into()?;
243 self.avro_writer.append_ser(manifests)?;
244 }
245 }
246 FormatVersion::V2 | FormatVersion::V3 => {
247 for mut manifest in manifests {
248 self.assign_sequence_numbers(&mut manifest)?;
249
250 if self.format_version == FormatVersion::V2 {
251 let manifest_entry: ManifestFileV2 = manifest.try_into()?;
252 self.avro_writer.append_ser(manifest_entry)?;
253 } else if self.format_version == FormatVersion::V3 {
254 self.assign_first_row_id(&mut manifest)?;
255 let manifest_entry: ManifestFileV3 = manifest.try_into()?;
256 self.avro_writer.append_ser(manifest_entry)?;
257 }
258 }
259 }
260 }
261 Ok(())
262 }
263
264 pub async fn close(self) -> Result<()> {
266 let data = self.avro_writer.into_inner()?;
267 let mut writer = self.output_file.writer().await?;
268 writer.write(Bytes::from(data)).await?;
269 writer.close().await?;
270 Ok(())
271 }
272
273 fn assign_sequence_numbers(&self, manifest: &mut ManifestFile) -> Result<()> {
275 if manifest.sequence_number == UNASSIGNED_SEQUENCE_NUMBER {
276 if manifest.added_snapshot_id != self.snapshot_id {
277 return Err(Error::new(
278 ErrorKind::DataInvalid,
279 format!(
280 "Found unassigned sequence number for a manifest from snapshot {}.",
281 manifest.added_snapshot_id
282 ),
283 ));
284 }
285 manifest.sequence_number = self.sequence_number;
286 }
287
288 if manifest.min_sequence_number == UNASSIGNED_SEQUENCE_NUMBER {
289 if manifest.added_snapshot_id != self.snapshot_id {
290 return Err(Error::new(
291 ErrorKind::DataInvalid,
292 format!(
293 "Found unassigned sequence number for a manifest from snapshot {}.",
294 manifest.added_snapshot_id
295 ),
296 ));
297 }
298 manifest.min_sequence_number = self.sequence_number;
299 }
300
301 Ok(())
302 }
303
304 fn assign_first_row_id(&mut self, manifest: &mut ManifestFile) -> Result<()> {
306 match manifest.content {
307 ManifestContentType::Data => {
308 match (self.next_row_id, manifest.first_row_id) {
309 (Some(_), Some(_)) => {
310 }
313 (None, Some(manifest_first_row_id)) => {
314 return Err(Error::new(
316 ErrorKind::Unexpected,
317 format!(
318 "Found invalid first-row-id assignment for Manifest {}. Writer does not have a next-row-id assigned, but the manifest has first-row-id assigned to {}.",
319 manifest.manifest_path, manifest_first_row_id,
320 ),
321 ));
322 }
323 (Some(writer_next_row_id), None) => {
324 let (existing_rows_count, added_rows_count) =
327 require_row_counts_in_manifest(manifest)?;
328 manifest.first_row_id = Some(writer_next_row_id);
329
330 self.next_row_id = writer_next_row_id
331 .checked_add(existing_rows_count)
332 .and_then(|sum| sum.checked_add(added_rows_count))
333 .ok_or_else(|| {
334 Error::new(
335 ErrorKind::DataInvalid,
336 format!(
337 "Row ID overflow when computing next row ID for Manifest {}. Next Row ID: {writer_next_row_id}, Existing Rows Count: {existing_rows_count}, Added Rows Count: {added_rows_count}",
338 manifest.manifest_path
339 ),
340 )
341 }).map(Some)?;
342 }
343 (None, None) => {
344 }
346 }
347 }
348 ManifestContentType::Deletes => {
349 manifest.first_row_id = None;
351 }
352 };
353
354 Ok(())
355 }
356}
357
358fn require_row_counts_in_manifest(manifest: &ManifestFile) -> Result<(u64, u64)> {
359 let existing_rows_count = manifest.existing_rows_count.ok_or_else(|| {
360 Error::new(
361 ErrorKind::DataInvalid,
362 format!(
363 "Cannot include a Manifest without existing-rows-count to a table with row lineage enabled. Manifest path: {}",
364 manifest.manifest_path,
365 ),
366 )
367 })?;
368 let added_rows_count = manifest.added_rows_count.ok_or_else(|| {
369 Error::new(
370 ErrorKind::DataInvalid,
371 format!(
372 "Cannot include a Manifest without added-rows-count to a table with row lineage enabled. Manifest path: {}",
373 manifest.manifest_path,
374 ),
375 )
376 })?;
377 Ok((existing_rows_count, added_rows_count))
378}
379
380mod _const_schema {
382 use std::sync::Arc;
383
384 use apache_avro::Schema as AvroSchema;
385 use once_cell::sync::Lazy;
386
387 use crate::avro::schema_to_avro_schema;
388 use crate::spec::{
389 ListType, NestedField, NestedFieldRef, PrimitiveType, Schema, StructType, Type,
390 };
391
392 static MANIFEST_PATH: Lazy<NestedFieldRef> = {
393 Lazy::new(|| {
394 Arc::new(NestedField::required(
395 500,
396 "manifest_path",
397 Type::Primitive(PrimitiveType::String),
398 ))
399 })
400 };
401 static MANIFEST_LENGTH: Lazy<NestedFieldRef> = {
402 Lazy::new(|| {
403 Arc::new(NestedField::required(
404 501,
405 "manifest_length",
406 Type::Primitive(PrimitiveType::Long),
407 ))
408 })
409 };
410 static PARTITION_SPEC_ID: Lazy<NestedFieldRef> = {
411 Lazy::new(|| {
412 Arc::new(NestedField::required(
413 502,
414 "partition_spec_id",
415 Type::Primitive(PrimitiveType::Int),
416 ))
417 })
418 };
419 static CONTENT: Lazy<NestedFieldRef> = {
420 Lazy::new(|| {
421 Arc::new(NestedField::required(
422 517,
423 "content",
424 Type::Primitive(PrimitiveType::Int),
425 ))
426 })
427 };
428 static SEQUENCE_NUMBER: Lazy<NestedFieldRef> = {
429 Lazy::new(|| {
430 Arc::new(NestedField::required(
431 515,
432 "sequence_number",
433 Type::Primitive(PrimitiveType::Long),
434 ))
435 })
436 };
437 static MIN_SEQUENCE_NUMBER: Lazy<NestedFieldRef> = {
438 Lazy::new(|| {
439 Arc::new(NestedField::required(
440 516,
441 "min_sequence_number",
442 Type::Primitive(PrimitiveType::Long),
443 ))
444 })
445 };
446 static ADDED_SNAPSHOT_ID: Lazy<NestedFieldRef> = {
447 Lazy::new(|| {
448 Arc::new(NestedField::required(
449 503,
450 "added_snapshot_id",
451 Type::Primitive(PrimitiveType::Long),
452 ))
453 })
454 };
455 static ADDED_FILES_COUNT_V2: Lazy<NestedFieldRef> = {
456 Lazy::new(|| {
457 Arc::new(NestedField::required(
458 504,
459 "added_files_count",
460 Type::Primitive(PrimitiveType::Int),
461 ))
462 })
463 };
464 static ADDED_FILES_COUNT_V1: Lazy<NestedFieldRef> = {
465 Lazy::new(|| {
466 Arc::new(NestedField::optional(
467 504,
468 "added_data_files_count",
469 Type::Primitive(PrimitiveType::Int),
470 ))
471 })
472 };
473 static EXISTING_FILES_COUNT_V2: Lazy<NestedFieldRef> = {
474 Lazy::new(|| {
475 Arc::new(NestedField::required(
476 505,
477 "existing_files_count",
478 Type::Primitive(PrimitiveType::Int),
479 ))
480 })
481 };
482 static EXISTING_FILES_COUNT_V1: Lazy<NestedFieldRef> = {
483 Lazy::new(|| {
484 Arc::new(NestedField::optional(
485 505,
486 "existing_data_files_count",
487 Type::Primitive(PrimitiveType::Int),
488 ))
489 })
490 };
491 static DELETED_FILES_COUNT_V2: Lazy<NestedFieldRef> = {
492 Lazy::new(|| {
493 Arc::new(NestedField::required(
494 506,
495 "deleted_files_count",
496 Type::Primitive(PrimitiveType::Int),
497 ))
498 })
499 };
500 static DELETED_FILES_COUNT_V1: Lazy<NestedFieldRef> = {
501 Lazy::new(|| {
502 Arc::new(NestedField::optional(
503 506,
504 "deleted_data_files_count",
505 Type::Primitive(PrimitiveType::Int),
506 ))
507 })
508 };
509 static ADDED_ROWS_COUNT_V2: Lazy<NestedFieldRef> = {
510 Lazy::new(|| {
511 Arc::new(NestedField::required(
512 512,
513 "added_rows_count",
514 Type::Primitive(PrimitiveType::Long),
515 ))
516 })
517 };
518 static ADDED_ROWS_COUNT_V1: Lazy<NestedFieldRef> = {
519 Lazy::new(|| {
520 Arc::new(NestedField::optional(
521 512,
522 "added_rows_count",
523 Type::Primitive(PrimitiveType::Long),
524 ))
525 })
526 };
527 static EXISTING_ROWS_COUNT_V2: Lazy<NestedFieldRef> = {
528 Lazy::new(|| {
529 Arc::new(NestedField::required(
530 513,
531 "existing_rows_count",
532 Type::Primitive(PrimitiveType::Long),
533 ))
534 })
535 };
536 static EXISTING_ROWS_COUNT_V1: Lazy<NestedFieldRef> = {
537 Lazy::new(|| {
538 Arc::new(NestedField::optional(
539 513,
540 "existing_rows_count",
541 Type::Primitive(PrimitiveType::Long),
542 ))
543 })
544 };
545 static DELETED_ROWS_COUNT_V2: Lazy<NestedFieldRef> = {
546 Lazy::new(|| {
547 Arc::new(NestedField::required(
548 514,
549 "deleted_rows_count",
550 Type::Primitive(PrimitiveType::Long),
551 ))
552 })
553 };
554 static DELETED_ROWS_COUNT_V1: Lazy<NestedFieldRef> = {
555 Lazy::new(|| {
556 Arc::new(NestedField::optional(
557 514,
558 "deleted_rows_count",
559 Type::Primitive(PrimitiveType::Long),
560 ))
561 })
562 };
563 static PARTITIONS: Lazy<NestedFieldRef> = {
564 Lazy::new(|| {
565 let fields = vec![
567 Arc::new(NestedField::required(
568 509,
569 "contains_null",
570 Type::Primitive(PrimitiveType::Boolean),
571 )),
572 Arc::new(NestedField::optional(
573 518,
574 "contains_nan",
575 Type::Primitive(PrimitiveType::Boolean),
576 )),
577 Arc::new(NestedField::optional(
578 510,
579 "lower_bound",
580 Type::Primitive(PrimitiveType::Binary),
581 )),
582 Arc::new(NestedField::optional(
583 511,
584 "upper_bound",
585 Type::Primitive(PrimitiveType::Binary),
586 )),
587 ];
588 let element_field = Arc::new(NestedField::required(
589 508,
590 "r_508",
591 Type::Struct(StructType::new(fields)),
592 ));
593 Arc::new(NestedField::optional(
594 507,
595 "partitions",
596 Type::List(ListType { element_field }),
597 ))
598 })
599 };
600 static KEY_METADATA: Lazy<NestedFieldRef> = {
601 Lazy::new(|| {
602 Arc::new(NestedField::optional(
603 519,
604 "key_metadata",
605 Type::Primitive(PrimitiveType::Binary),
606 ))
607 })
608 };
609 static FIRST_ROW_ID: Lazy<NestedFieldRef> = {
610 Lazy::new(|| {
611 Arc::new(NestedField::optional(
612 520,
613 "first_row_id",
614 Type::Primitive(PrimitiveType::Long),
615 ))
616 })
617 };
618
619 static V1_SCHEMA: Lazy<Schema> = {
620 Lazy::new(|| {
621 let fields = vec![
622 MANIFEST_PATH.clone(),
623 MANIFEST_LENGTH.clone(),
624 PARTITION_SPEC_ID.clone(),
625 ADDED_SNAPSHOT_ID.clone(),
626 ADDED_FILES_COUNT_V1.clone().to_owned(),
627 EXISTING_FILES_COUNT_V1.clone(),
628 DELETED_FILES_COUNT_V1.clone(),
629 ADDED_ROWS_COUNT_V1.clone(),
630 EXISTING_ROWS_COUNT_V1.clone(),
631 DELETED_ROWS_COUNT_V1.clone(),
632 PARTITIONS.clone(),
633 KEY_METADATA.clone(),
634 ];
635 Schema::builder().with_fields(fields).build().unwrap()
636 })
637 };
638
639 static V2_SCHEMA: Lazy<Schema> = {
640 Lazy::new(|| {
641 let fields = vec![
642 MANIFEST_PATH.clone(),
643 MANIFEST_LENGTH.clone(),
644 PARTITION_SPEC_ID.clone(),
645 CONTENT.clone(),
646 SEQUENCE_NUMBER.clone(),
647 MIN_SEQUENCE_NUMBER.clone(),
648 ADDED_SNAPSHOT_ID.clone(),
649 ADDED_FILES_COUNT_V2.clone(),
650 EXISTING_FILES_COUNT_V2.clone(),
651 DELETED_FILES_COUNT_V2.clone(),
652 ADDED_ROWS_COUNT_V2.clone(),
653 EXISTING_ROWS_COUNT_V2.clone(),
654 DELETED_ROWS_COUNT_V2.clone(),
655 PARTITIONS.clone(),
656 KEY_METADATA.clone(),
657 ];
658 Schema::builder().with_fields(fields).build().unwrap()
659 })
660 };
661
662 static V3_SCHEMA: Lazy<Schema> = {
663 Lazy::new(|| {
664 let fields = vec![
665 MANIFEST_PATH.clone(),
666 MANIFEST_LENGTH.clone(),
667 PARTITION_SPEC_ID.clone(),
668 CONTENT.clone(),
669 SEQUENCE_NUMBER.clone(),
670 MIN_SEQUENCE_NUMBER.clone(),
671 ADDED_SNAPSHOT_ID.clone(),
672 ADDED_FILES_COUNT_V2.clone(),
673 EXISTING_FILES_COUNT_V2.clone(),
674 DELETED_FILES_COUNT_V2.clone(),
675 ADDED_ROWS_COUNT_V2.clone(),
676 EXISTING_ROWS_COUNT_V2.clone(),
677 DELETED_ROWS_COUNT_V2.clone(),
678 PARTITIONS.clone(),
679 KEY_METADATA.clone(),
680 FIRST_ROW_ID.clone(),
681 ];
682 Schema::builder().with_fields(fields).build().unwrap()
683 })
684 };
685
686 pub(super) static MANIFEST_LIST_AVRO_SCHEMA_V1: Lazy<AvroSchema> =
687 Lazy::new(|| schema_to_avro_schema("manifest_file", &V1_SCHEMA).unwrap());
688
689 pub(super) static MANIFEST_LIST_AVRO_SCHEMA_V2: Lazy<AvroSchema> =
690 Lazy::new(|| schema_to_avro_schema("manifest_file", &V2_SCHEMA).unwrap());
691
692 pub(super) static MANIFEST_LIST_AVRO_SCHEMA_V3: Lazy<AvroSchema> =
693 Lazy::new(|| schema_to_avro_schema("manifest_file", &V3_SCHEMA).unwrap());
694}
695
696#[derive(Debug, PartialEq, Clone, Eq, Hash)]
698pub struct ManifestFile {
699 pub manifest_path: String,
703 pub manifest_length: i64,
707 pub partition_spec_id: i32,
712 pub content: ManifestContentType,
717 pub sequence_number: i64,
722 pub min_sequence_number: i64,
727 pub added_snapshot_id: i64,
731 pub added_files_count: Option<u32>,
736 pub existing_files_count: Option<u32>,
741 pub deleted_files_count: Option<u32>,
746 pub added_rows_count: Option<u64>,
751 pub existing_rows_count: Option<u64>,
756 pub deleted_rows_count: Option<u64>,
761 pub partitions: Option<Vec<FieldSummary>>,
768 pub key_metadata: Option<Vec<u8>>,
772 pub first_row_id: Option<u64>,
776}
777
778impl ManifestFile {
779 pub fn has_added_files(&self) -> bool {
781 self.added_files_count.map(|c| c > 0).unwrap_or(true)
782 }
783
784 pub fn has_deleted_files(&self) -> bool {
786 self.deleted_files_count.map(|c| c > 0).unwrap_or(true)
787 }
788
789 pub fn has_existing_files(&self) -> bool {
791 self.existing_files_count.map(|c| c > 0).unwrap_or(true)
792 }
793}
794
795#[derive(Debug, PartialEq, Clone, Copy, Eq, Hash, Default)]
797pub enum ManifestContentType {
798 #[default]
800 Data = 0,
801 Deletes = 1,
803}
804
805impl FromStr for ManifestContentType {
806 type Err = Error;
807
808 fn from_str(s: &str) -> Result<Self> {
809 match s {
810 "data" => Ok(ManifestContentType::Data),
811 "deletes" => Ok(ManifestContentType::Deletes),
812 _ => Err(Error::new(
813 ErrorKind::DataInvalid,
814 format!("Invalid manifest content type: {s}"),
815 )),
816 }
817 }
818}
819
820impl std::fmt::Display for ManifestContentType {
821 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
822 match self {
823 ManifestContentType::Data => write!(f, "data"),
824 ManifestContentType::Deletes => write!(f, "deletes"),
825 }
826 }
827}
828
829impl TryFrom<i32> for ManifestContentType {
830 type Error = Error;
831
832 fn try_from(value: i32) -> std::result::Result<Self, Self::Error> {
833 match value {
834 0 => Ok(ManifestContentType::Data),
835 1 => Ok(ManifestContentType::Deletes),
836 _ => Err(Error::new(
837 crate::ErrorKind::DataInvalid,
838 format!("Invalid manifest content type. Expected 0 or 1, got {value}"),
839 )),
840 }
841 }
842}
843
844impl ManifestFile {
845 pub async fn load_manifest(&self, file_io: &FileIO) -> Result<Manifest> {
849 let avro = file_io.new_input(&self.manifest_path)?.read().await?;
850
851 let (metadata, mut entries) = Manifest::try_from_avro_bytes(&avro)?;
852
853 for entry in &mut entries {
855 entry.inherit_data(self);
856 }
857
858 Ok(Manifest::new(metadata, entries))
859 }
860}
861
862#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone, Default, Hash)]
866pub struct FieldSummary {
867 pub contains_null: bool,
872 pub contains_nan: Option<bool>,
876 pub lower_bound: Option<ByteBuf>,
880 pub upper_bound: Option<ByteBuf>,
884}
885
886pub(super) mod _serde {
891 pub use serde_bytes::ByteBuf;
892 use serde_derive::{Deserialize, Serialize};
893
894 use super::ManifestFile;
895 use crate::Error;
896 use crate::error::Result;
897 use crate::spec::FieldSummary;
898
899 #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
900 #[serde(transparent)]
901 pub(crate) struct ManifestListV3 {
902 entries: Vec<ManifestFileV3>,
903 }
904
905 #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
906 #[serde(transparent)]
907 pub(crate) struct ManifestListV2 {
908 entries: Vec<ManifestFileV2>,
909 }
910
911 #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
912 #[serde(transparent)]
913 pub(crate) struct ManifestListV1 {
914 entries: Vec<ManifestFileV1>,
915 }
916
917 impl ManifestListV3 {
918 pub fn try_into(self) -> Result<super::ManifestList> {
920 Ok(super::ManifestList {
921 entries: self
922 .entries
923 .into_iter()
924 .map(|v| v.try_into())
925 .collect::<Result<Vec<_>>>()?,
926 })
927 }
928 }
929
930 impl TryFrom<super::ManifestList> for ManifestListV3 {
931 type Error = Error;
932
933 fn try_from(value: super::ManifestList) -> std::result::Result<Self, Self::Error> {
934 Ok(Self {
935 entries: value
936 .entries
937 .into_iter()
938 .map(|v| v.try_into())
939 .collect::<std::result::Result<Vec<_>, _>>()?,
940 })
941 }
942 }
943
944 impl ManifestListV2 {
945 pub fn try_into(self) -> Result<super::ManifestList> {
947 Ok(super::ManifestList {
948 entries: self
949 .entries
950 .into_iter()
951 .map(|v| v.try_into())
952 .collect::<Result<Vec<_>>>()?,
953 })
954 }
955 }
956
957 impl TryFrom<super::ManifestList> for ManifestListV2 {
958 type Error = Error;
959
960 fn try_from(value: super::ManifestList) -> std::result::Result<Self, Self::Error> {
961 Ok(Self {
962 entries: value
963 .entries
964 .into_iter()
965 .map(|v| v.try_into())
966 .collect::<std::result::Result<Vec<_>, _>>()?,
967 })
968 }
969 }
970
971 impl ManifestListV1 {
972 pub fn try_into(self) -> Result<super::ManifestList> {
974 Ok(super::ManifestList {
975 entries: self
976 .entries
977 .into_iter()
978 .map(|v| v.try_into())
979 .collect::<Result<Vec<_>>>()?,
980 })
981 }
982 }
983
984 impl TryFrom<super::ManifestList> for ManifestListV1 {
985 type Error = Error;
986
987 fn try_from(value: super::ManifestList) -> std::result::Result<Self, Self::Error> {
988 Ok(Self {
989 entries: value
990 .entries
991 .into_iter()
992 .map(|v| v.try_into())
993 .collect::<std::result::Result<Vec<_>, _>>()?,
994 })
995 }
996 }
997
998 #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
999 pub(super) struct ManifestFileV1 {
1000 pub manifest_path: String,
1001 pub manifest_length: i64,
1002 pub partition_spec_id: i32,
1003 pub added_snapshot_id: i64,
1004 pub added_data_files_count: Option<i32>,
1005 pub existing_data_files_count: Option<i32>,
1006 pub deleted_data_files_count: Option<i32>,
1007 pub added_rows_count: Option<i64>,
1008 pub existing_rows_count: Option<i64>,
1009 pub deleted_rows_count: Option<i64>,
1010 pub partitions: Option<Vec<FieldSummary>>,
1011 pub key_metadata: Option<ByteBuf>,
1012 }
1013
1014 #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
1018 pub(super) struct ManifestFileV2 {
1019 pub manifest_path: String,
1020 pub manifest_length: i64,
1021 pub partition_spec_id: i32,
1022 #[serde(default = "v2_default_content_for_v1")]
1023 pub content: i32,
1024 #[serde(default = "v2_default_sequence_number_for_v1")]
1025 pub sequence_number: i64,
1026 #[serde(default = "v2_default_min_sequence_number_for_v1")]
1027 pub min_sequence_number: i64,
1028 pub added_snapshot_id: i64,
1029 #[serde(alias = "added_data_files_count", alias = "added_files_count")]
1030 pub added_files_count: i32,
1031 #[serde(alias = "existing_data_files_count", alias = "existing_files_count")]
1032 pub existing_files_count: i32,
1033 #[serde(alias = "deleted_data_files_count", alias = "deleted_files_count")]
1034 pub deleted_files_count: i32,
1035 pub added_rows_count: i64,
1036 pub existing_rows_count: i64,
1037 pub deleted_rows_count: i64,
1038 pub partitions: Option<Vec<FieldSummary>>,
1039 pub key_metadata: Option<ByteBuf>,
1040 }
1041
1042 #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
1043 pub(super) struct ManifestFileV3 {
1044 pub manifest_path: String,
1045 pub manifest_length: i64,
1046 pub partition_spec_id: i32,
1047 #[serde(default = "v2_default_content_for_v1")]
1048 pub content: i32,
1049 #[serde(default = "v2_default_sequence_number_for_v1")]
1050 pub sequence_number: i64,
1051 #[serde(default = "v2_default_min_sequence_number_for_v1")]
1052 pub min_sequence_number: i64,
1053 pub added_snapshot_id: i64,
1054 #[serde(alias = "added_data_files_count", alias = "added_files_count")]
1055 pub added_files_count: i32,
1056 #[serde(alias = "existing_data_files_count", alias = "existing_files_count")]
1057 pub existing_files_count: i32,
1058 #[serde(alias = "deleted_data_files_count", alias = "deleted_files_count")]
1059 pub deleted_files_count: i32,
1060 pub added_rows_count: i64,
1061 pub existing_rows_count: i64,
1062 pub deleted_rows_count: i64,
1063 pub partitions: Option<Vec<FieldSummary>>,
1064 pub key_metadata: Option<ByteBuf>,
1065 pub first_row_id: Option<u64>,
1066 }
1067
1068 impl ManifestFileV3 {
1069 pub fn try_into(self) -> Result<ManifestFile> {
1071 let manifest_file = ManifestFile {
1072 manifest_path: self.manifest_path,
1073 manifest_length: self.manifest_length,
1074 partition_spec_id: self.partition_spec_id,
1075 content: self.content.try_into()?,
1076 sequence_number: self.sequence_number,
1077 min_sequence_number: self.min_sequence_number,
1078 added_snapshot_id: self.added_snapshot_id,
1079 added_files_count: Some(self.added_files_count.try_into()?),
1080 existing_files_count: Some(self.existing_files_count.try_into()?),
1081 deleted_files_count: Some(self.deleted_files_count.try_into()?),
1082 added_rows_count: Some(self.added_rows_count.try_into()?),
1083 existing_rows_count: Some(self.existing_rows_count.try_into()?),
1084 deleted_rows_count: Some(self.deleted_rows_count.try_into()?),
1085 partitions: self.partitions,
1086 key_metadata: self.key_metadata.map(|b| b.into_vec()),
1087 first_row_id: self.first_row_id,
1088 };
1089
1090 Ok(manifest_file)
1091 }
1092 }
1093
1094 impl ManifestFileV2 {
1095 pub fn try_into(self) -> Result<ManifestFile> {
1097 Ok(ManifestFile {
1098 manifest_path: self.manifest_path,
1099 manifest_length: self.manifest_length,
1100 partition_spec_id: self.partition_spec_id,
1101 content: self.content.try_into()?,
1102 sequence_number: self.sequence_number,
1103 min_sequence_number: self.min_sequence_number,
1104 added_snapshot_id: self.added_snapshot_id,
1105 added_files_count: Some(self.added_files_count.try_into()?),
1106 existing_files_count: Some(self.existing_files_count.try_into()?),
1107 deleted_files_count: Some(self.deleted_files_count.try_into()?),
1108 added_rows_count: Some(self.added_rows_count.try_into()?),
1109 existing_rows_count: Some(self.existing_rows_count.try_into()?),
1110 deleted_rows_count: Some(self.deleted_rows_count.try_into()?),
1111 partitions: self.partitions,
1112 key_metadata: self.key_metadata.map(|b| b.into_vec()),
1113 first_row_id: None,
1114 })
1115 }
1116 }
1117
1118 fn v2_default_content_for_v1() -> i32 {
1119 super::ManifestContentType::Data as i32
1120 }
1121
1122 fn v2_default_sequence_number_for_v1() -> i64 {
1123 0
1124 }
1125
1126 fn v2_default_min_sequence_number_for_v1() -> i64 {
1127 0
1128 }
1129
1130 impl ManifestFileV1 {
1131 pub fn try_into(self) -> Result<ManifestFile> {
1133 Ok(ManifestFile {
1134 manifest_path: self.manifest_path,
1135 manifest_length: self.manifest_length,
1136 partition_spec_id: self.partition_spec_id,
1137 added_snapshot_id: self.added_snapshot_id,
1138 added_files_count: self
1139 .added_data_files_count
1140 .map(TryInto::try_into)
1141 .transpose()?,
1142 existing_files_count: self
1143 .existing_data_files_count
1144 .map(TryInto::try_into)
1145 .transpose()?,
1146 deleted_files_count: self
1147 .deleted_data_files_count
1148 .map(TryInto::try_into)
1149 .transpose()?,
1150 added_rows_count: self.added_rows_count.map(TryInto::try_into).transpose()?,
1151 existing_rows_count: self
1152 .existing_rows_count
1153 .map(TryInto::try_into)
1154 .transpose()?,
1155 deleted_rows_count: self.deleted_rows_count.map(TryInto::try_into).transpose()?,
1156 partitions: self.partitions,
1157 key_metadata: self.key_metadata.map(|b| b.into_vec()),
1158 content: super::ManifestContentType::Data,
1161 sequence_number: 0,
1162 min_sequence_number: 0,
1163 first_row_id: None,
1164 })
1165 }
1166 }
1167
1168 fn convert_to_serde_key_metadata(key_metadata: Option<Vec<u8>>) -> Option<ByteBuf> {
1169 match key_metadata {
1170 Some(metadata) if !metadata.is_empty() => Some(ByteBuf::from(metadata)),
1171 _ => None,
1172 }
1173 }
1174
1175 impl TryFrom<ManifestFile> for ManifestFileV3 {
1176 type Error = Error;
1177
1178 fn try_from(value: ManifestFile) -> std::result::Result<Self, Self::Error> {
1179 let key_metadata = convert_to_serde_key_metadata(value.key_metadata);
1180 Ok(Self {
1181 manifest_path: value.manifest_path,
1182 manifest_length: value.manifest_length,
1183 partition_spec_id: value.partition_spec_id,
1184 content: value.content as i32,
1185 sequence_number: value.sequence_number,
1186 min_sequence_number: value.min_sequence_number,
1187 added_snapshot_id: value.added_snapshot_id,
1188 added_files_count: value
1189 .added_files_count
1190 .ok_or_else(|| {
1191 Error::new(
1192 crate::ErrorKind::DataInvalid,
1193 "added_data_files_count in ManifestFileV3 is required",
1194 )
1195 })?
1196 .try_into()?,
1197 existing_files_count: value
1198 .existing_files_count
1199 .ok_or_else(|| {
1200 Error::new(
1201 crate::ErrorKind::DataInvalid,
1202 "existing_data_files_count in ManifestFileV3 is required",
1203 )
1204 })?
1205 .try_into()?,
1206 deleted_files_count: value
1207 .deleted_files_count
1208 .ok_or_else(|| {
1209 Error::new(
1210 crate::ErrorKind::DataInvalid,
1211 "deleted_data_files_count in ManifestFileV3 is required",
1212 )
1213 })?
1214 .try_into()?,
1215 added_rows_count: value
1216 .added_rows_count
1217 .ok_or_else(|| {
1218 Error::new(
1219 crate::ErrorKind::DataInvalid,
1220 "added_rows_count in ManifestFileV3 is required",
1221 )
1222 })?
1223 .try_into()?,
1224 existing_rows_count: value
1225 .existing_rows_count
1226 .ok_or_else(|| {
1227 Error::new(
1228 crate::ErrorKind::DataInvalid,
1229 "existing_rows_count in ManifestFileV3 is required",
1230 )
1231 })?
1232 .try_into()?,
1233 deleted_rows_count: value
1234 .deleted_rows_count
1235 .ok_or_else(|| {
1236 Error::new(
1237 crate::ErrorKind::DataInvalid,
1238 "deleted_rows_count in ManifestFileV3 is required",
1239 )
1240 })?
1241 .try_into()?,
1242 partitions: value.partitions,
1243 key_metadata,
1244 first_row_id: value.first_row_id,
1245 })
1246 }
1247 }
1248
1249 impl TryFrom<ManifestFile> for ManifestFileV2 {
1250 type Error = Error;
1251
1252 fn try_from(value: ManifestFile) -> std::result::Result<Self, Self::Error> {
1253 let key_metadata = convert_to_serde_key_metadata(value.key_metadata);
1254 Ok(Self {
1255 manifest_path: value.manifest_path,
1256 manifest_length: value.manifest_length,
1257 partition_spec_id: value.partition_spec_id,
1258 content: value.content as i32,
1259 sequence_number: value.sequence_number,
1260 min_sequence_number: value.min_sequence_number,
1261 added_snapshot_id: value.added_snapshot_id,
1262 added_files_count: value
1263 .added_files_count
1264 .ok_or_else(|| {
1265 Error::new(
1266 crate::ErrorKind::DataInvalid,
1267 "added_data_files_count in ManifestFileV2 should be require",
1268 )
1269 })?
1270 .try_into()?,
1271 existing_files_count: value
1272 .existing_files_count
1273 .ok_or_else(|| {
1274 Error::new(
1275 crate::ErrorKind::DataInvalid,
1276 "existing_data_files_count in ManifestFileV2 should be require",
1277 )
1278 })?
1279 .try_into()?,
1280 deleted_files_count: value
1281 .deleted_files_count
1282 .ok_or_else(|| {
1283 Error::new(
1284 crate::ErrorKind::DataInvalid,
1285 "deleted_data_files_count in ManifestFileV2 should be require",
1286 )
1287 })?
1288 .try_into()?,
1289 added_rows_count: value
1290 .added_rows_count
1291 .ok_or_else(|| {
1292 Error::new(
1293 crate::ErrorKind::DataInvalid,
1294 "added_rows_count in ManifestFileV2 should be require",
1295 )
1296 })?
1297 .try_into()?,
1298 existing_rows_count: value
1299 .existing_rows_count
1300 .ok_or_else(|| {
1301 Error::new(
1302 crate::ErrorKind::DataInvalid,
1303 "existing_rows_count in ManifestFileV2 should be require",
1304 )
1305 })?
1306 .try_into()?,
1307 deleted_rows_count: value
1308 .deleted_rows_count
1309 .ok_or_else(|| {
1310 Error::new(
1311 crate::ErrorKind::DataInvalid,
1312 "deleted_rows_count in ManifestFileV2 should be require",
1313 )
1314 })?
1315 .try_into()?,
1316 partitions: value.partitions,
1317 key_metadata,
1318 })
1319 }
1320 }
1321
1322 impl TryFrom<ManifestFile> for ManifestFileV1 {
1323 type Error = Error;
1324
1325 fn try_from(value: ManifestFile) -> std::result::Result<Self, Self::Error> {
1326 let key_metadata = convert_to_serde_key_metadata(value.key_metadata);
1327 Ok(Self {
1328 manifest_path: value.manifest_path,
1329 manifest_length: value.manifest_length,
1330 partition_spec_id: value.partition_spec_id,
1331 added_snapshot_id: value.added_snapshot_id,
1332 added_data_files_count: value
1333 .added_files_count
1334 .map(TryInto::try_into)
1335 .transpose()?,
1336 existing_data_files_count: value
1337 .existing_files_count
1338 .map(TryInto::try_into)
1339 .transpose()?,
1340 deleted_data_files_count: value
1341 .deleted_files_count
1342 .map(TryInto::try_into)
1343 .transpose()?,
1344 added_rows_count: value.added_rows_count.map(TryInto::try_into).transpose()?,
1345 existing_rows_count: value
1346 .existing_rows_count
1347 .map(TryInto::try_into)
1348 .transpose()?,
1349 deleted_rows_count: value
1350 .deleted_rows_count
1351 .map(TryInto::try_into)
1352 .transpose()?,
1353 partitions: value.partitions,
1354 key_metadata,
1355 })
1356 }
1357 }
1358}
1359
1360#[cfg(test)]
1361mod test {
1362 use std::fs;
1363
1364 use apache_avro::{Reader, Schema};
1365 use tempfile::TempDir;
1366
1367 use super::_serde::ManifestListV2;
1368 use crate::io::FileIOBuilder;
1369 use crate::spec::manifest_list::_serde::{ManifestListV1, ManifestListV3};
1370 use crate::spec::{
1371 Datum, FieldSummary, ManifestContentType, ManifestFile, ManifestList, ManifestListWriter,
1372 UNASSIGNED_SEQUENCE_NUMBER,
1373 };
1374
1375 #[tokio::test]
1376 async fn test_parse_manifest_list_v1() {
1377 let manifest_list = ManifestList {
1378 entries: vec![
1379 ManifestFile {
1380 manifest_path: "/opt/bitnami/spark/warehouse/db/table/metadata/10d28031-9739-484c-92db-cdf2975cead4-m0.avro".to_string(),
1381 manifest_length: 5806,
1382 partition_spec_id: 0,
1383 content: ManifestContentType::Data,
1384 sequence_number: 0,
1385 min_sequence_number: 0,
1386 added_snapshot_id: 1646658105718557341,
1387 added_files_count: Some(3),
1388 existing_files_count: Some(0),
1389 deleted_files_count: Some(0),
1390 added_rows_count: Some(3),
1391 existing_rows_count: Some(0),
1392 deleted_rows_count: Some(0),
1393 partitions: Some(vec![]),
1394 key_metadata: None,
1395 first_row_id: None,
1396 }
1397 ]
1398 };
1399
1400 let file_io = FileIOBuilder::new_fs_io().build().unwrap();
1401
1402 let tmp_dir = TempDir::new().unwrap();
1403 let file_name = "simple_manifest_list_v1.avro";
1404 let full_path = format!("{}/{}", tmp_dir.path().to_str().unwrap(), file_name);
1405
1406 let mut writer = ManifestListWriter::v1(
1407 file_io.new_output(full_path.clone()).unwrap(),
1408 1646658105718557341,
1409 Some(1646658105718557341),
1410 );
1411
1412 writer
1413 .add_manifests(manifest_list.entries.clone().into_iter())
1414 .unwrap();
1415 writer.close().await.unwrap();
1416
1417 let bs = fs::read(full_path).expect("read_file must succeed");
1418
1419 let parsed_manifest_list =
1420 ManifestList::parse_with_version(&bs, crate::spec::FormatVersion::V1).unwrap();
1421
1422 assert_eq!(manifest_list, parsed_manifest_list);
1423 }
1424
1425 #[tokio::test]
1426 async fn test_parse_manifest_list_v2() {
1427 let manifest_list = ManifestList {
1428 entries: vec![
1429 ManifestFile {
1430 manifest_path: "s3a://icebergdata/demo/s1/t1/metadata/05ffe08b-810f-49b3-a8f4-e88fc99b254a-m0.avro".to_string(),
1431 manifest_length: 6926,
1432 partition_spec_id: 1,
1433 content: ManifestContentType::Data,
1434 sequence_number: 1,
1435 min_sequence_number: 1,
1436 added_snapshot_id: 377075049360453639,
1437 added_files_count: Some(1),
1438 existing_files_count: Some(0),
1439 deleted_files_count: Some(0),
1440 added_rows_count: Some(3),
1441 existing_rows_count: Some(0),
1442 deleted_rows_count: Some(0),
1443 partitions: Some(
1444 vec![FieldSummary { contains_null: false, contains_nan: Some(false), lower_bound: Some(Datum::long(1).to_bytes().unwrap()), upper_bound: Some(Datum::long(1).to_bytes().unwrap())}]
1445 ),
1446 key_metadata: None,
1447 first_row_id: None,
1448 },
1449 ManifestFile {
1450 manifest_path: "s3a://icebergdata/demo/s1/t1/metadata/05ffe08b-810f-49b3-a8f4-e88fc99b254a-m1.avro".to_string(),
1451 manifest_length: 6926,
1452 partition_spec_id: 2,
1453 content: ManifestContentType::Data,
1454 sequence_number: 1,
1455 min_sequence_number: 1,
1456 added_snapshot_id: 377075049360453639,
1457 added_files_count: Some(1),
1458 existing_files_count: Some(0),
1459 deleted_files_count: Some(0),
1460 added_rows_count: Some(3),
1461 existing_rows_count: Some(0),
1462 deleted_rows_count: Some(0),
1463 partitions: Some(
1464 vec![FieldSummary { contains_null: false, contains_nan: Some(false), lower_bound: Some(Datum::float(1.1).to_bytes().unwrap()), upper_bound: Some(Datum::float(2.1).to_bytes().unwrap())}]
1465 ),
1466 key_metadata: None,
1467 first_row_id: None,
1468 }
1469 ]
1470 };
1471
1472 let file_io = FileIOBuilder::new_fs_io().build().unwrap();
1473
1474 let tmp_dir = TempDir::new().unwrap();
1475 let file_name = "simple_manifest_list_v1.avro";
1476 let full_path = format!("{}/{}", tmp_dir.path().to_str().unwrap(), file_name);
1477
1478 let mut writer = ManifestListWriter::v2(
1479 file_io.new_output(full_path.clone()).unwrap(),
1480 1646658105718557341,
1481 Some(1646658105718557341),
1482 1,
1483 );
1484
1485 writer
1486 .add_manifests(manifest_list.entries.clone().into_iter())
1487 .unwrap();
1488 writer.close().await.unwrap();
1489
1490 let bs = fs::read(full_path).expect("read_file must succeed");
1491
1492 let parsed_manifest_list =
1493 ManifestList::parse_with_version(&bs, crate::spec::FormatVersion::V2).unwrap();
1494
1495 assert_eq!(manifest_list, parsed_manifest_list);
1496 }
1497
1498 #[tokio::test]
1499 async fn test_parse_manifest_list_v3() {
1500 let manifest_list = ManifestList {
1501 entries: vec![
1502 ManifestFile {
1503 manifest_path: "s3a://icebergdata/demo/s1/t1/metadata/05ffe08b-810f-49b3-a8f4-e88fc99b254a-m0.avro".to_string(),
1504 manifest_length: 6926,
1505 partition_spec_id: 1,
1506 content: ManifestContentType::Data,
1507 sequence_number: 1,
1508 min_sequence_number: 1,
1509 added_snapshot_id: 377075049360453639,
1510 added_files_count: Some(1),
1511 existing_files_count: Some(0),
1512 deleted_files_count: Some(0),
1513 added_rows_count: Some(3),
1514 existing_rows_count: Some(0),
1515 deleted_rows_count: Some(0),
1516 partitions: Some(
1517 vec![FieldSummary { contains_null: false, contains_nan: Some(false), lower_bound: Some(Datum::long(1).to_bytes().unwrap()), upper_bound: Some(Datum::long(1).to_bytes().unwrap())}]
1518 ),
1519 key_metadata: None,
1520 first_row_id: Some(10),
1521 },
1522 ManifestFile {
1523 manifest_path: "s3a://icebergdata/demo/s1/t1/metadata/05ffe08b-810f-49b3-a8f4-e88fc99b254a-m1.avro".to_string(),
1524 manifest_length: 6926,
1525 partition_spec_id: 2,
1526 content: ManifestContentType::Data,
1527 sequence_number: 1,
1528 min_sequence_number: 1,
1529 added_snapshot_id: 377075049360453639,
1530 added_files_count: Some(1),
1531 existing_files_count: Some(0),
1532 deleted_files_count: Some(0),
1533 added_rows_count: Some(3),
1534 existing_rows_count: Some(0),
1535 deleted_rows_count: Some(0),
1536 partitions: Some(
1537 vec![FieldSummary { contains_null: false, contains_nan: Some(false), lower_bound: Some(Datum::float(1.1).to_bytes().unwrap()), upper_bound: Some(Datum::float(2.1).to_bytes().unwrap())}]
1538 ),
1539 key_metadata: None,
1540 first_row_id: Some(13),
1541 }
1542 ]
1543 };
1544
1545 let file_io = FileIOBuilder::new_fs_io().build().unwrap();
1546
1547 let tmp_dir = TempDir::new().unwrap();
1548 let file_name = "simple_manifest_list_v3.avro";
1549 let full_path = format!("{}/{}", tmp_dir.path().to_str().unwrap(), file_name);
1550
1551 let mut writer = ManifestListWriter::v3(
1552 file_io.new_output(full_path.clone()).unwrap(),
1553 377075049360453639,
1554 Some(377075049360453639),
1555 1,
1556 Some(10),
1557 );
1558
1559 writer
1560 .add_manifests(manifest_list.entries.clone().into_iter())
1561 .unwrap();
1562 writer.close().await.unwrap();
1563
1564 let bs = fs::read(full_path).expect("read_file must succeed");
1565
1566 let parsed_manifest_list =
1567 ManifestList::parse_with_version(&bs, crate::spec::FormatVersion::V3).unwrap();
1568
1569 assert_eq!(manifest_list, parsed_manifest_list);
1570 }
1571
1572 #[test]
1573 fn test_serialize_manifest_list_v1() {
1574 let manifest_list:ManifestListV1 = ManifestList {
1575 entries: vec![ManifestFile {
1576 manifest_path: "/opt/bitnami/spark/warehouse/db/table/metadata/10d28031-9739-484c-92db-cdf2975cead4-m0.avro".to_string(),
1577 manifest_length: 5806,
1578 partition_spec_id: 0,
1579 content: ManifestContentType::Data,
1580 sequence_number: 0,
1581 min_sequence_number: 0,
1582 added_snapshot_id: 1646658105718557341,
1583 added_files_count: Some(3),
1584 existing_files_count: Some(0),
1585 deleted_files_count: Some(0),
1586 added_rows_count: Some(3),
1587 existing_rows_count: Some(0),
1588 deleted_rows_count: Some(0),
1589 partitions: None,
1590 key_metadata: None,
1591 first_row_id: None,
1592 }]
1593 }.try_into().unwrap();
1594 let result = serde_json::to_string(&manifest_list).unwrap();
1595 assert_eq!(
1596 result,
1597 r#"[{"manifest_path":"/opt/bitnami/spark/warehouse/db/table/metadata/10d28031-9739-484c-92db-cdf2975cead4-m0.avro","manifest_length":5806,"partition_spec_id":0,"added_snapshot_id":1646658105718557341,"added_data_files_count":3,"existing_data_files_count":0,"deleted_data_files_count":0,"added_rows_count":3,"existing_rows_count":0,"deleted_rows_count":0,"partitions":null,"key_metadata":null}]"#
1598 );
1599 }
1600
1601 #[test]
1602 fn test_serialize_manifest_list_v2() {
1603 let manifest_list:ManifestListV2 = ManifestList {
1604 entries: vec![ManifestFile {
1605 manifest_path: "s3a://icebergdata/demo/s1/t1/metadata/05ffe08b-810f-49b3-a8f4-e88fc99b254a-m0.avro".to_string(),
1606 manifest_length: 6926,
1607 partition_spec_id: 1,
1608 content: ManifestContentType::Data,
1609 sequence_number: 1,
1610 min_sequence_number: 1,
1611 added_snapshot_id: 377075049360453639,
1612 added_files_count: Some(1),
1613 existing_files_count: Some(0),
1614 deleted_files_count: Some(0),
1615 added_rows_count: Some(3),
1616 existing_rows_count: Some(0),
1617 deleted_rows_count: Some(0),
1618 partitions: Some(
1619 vec![FieldSummary { contains_null: false, contains_nan: Some(false), lower_bound: Some(Datum::long(1).to_bytes().unwrap()), upper_bound: Some(Datum::long(1).to_bytes().unwrap())}]
1620 ),
1621 key_metadata: None,
1622 first_row_id: None,
1623 }]
1624 }.try_into().unwrap();
1625 let result = serde_json::to_string(&manifest_list).unwrap();
1626 assert_eq!(
1627 result,
1628 r#"[{"manifest_path":"s3a://icebergdata/demo/s1/t1/metadata/05ffe08b-810f-49b3-a8f4-e88fc99b254a-m0.avro","manifest_length":6926,"partition_spec_id":1,"content":0,"sequence_number":1,"min_sequence_number":1,"added_snapshot_id":377075049360453639,"added_files_count":1,"existing_files_count":0,"deleted_files_count":0,"added_rows_count":3,"existing_rows_count":0,"deleted_rows_count":0,"partitions":[{"contains_null":false,"contains_nan":false,"lower_bound":[1,0,0,0,0,0,0,0],"upper_bound":[1,0,0,0,0,0,0,0]}],"key_metadata":null}]"#
1629 );
1630 }
1631
1632 #[test]
1633 fn test_serialize_manifest_list_v3() {
1634 let manifest_list: ManifestListV3 = ManifestList {
1635 entries: vec![ManifestFile {
1636 manifest_path: "s3a://icebergdata/demo/s1/t1/metadata/05ffe08b-810f-49b3-a8f4-e88fc99b254a-m0.avro".to_string(),
1637 manifest_length: 6926,
1638 partition_spec_id: 1,
1639 content: ManifestContentType::Data,
1640 sequence_number: 1,
1641 min_sequence_number: 1,
1642 added_snapshot_id: 377075049360453639,
1643 added_files_count: Some(1),
1644 existing_files_count: Some(0),
1645 deleted_files_count: Some(0),
1646 added_rows_count: Some(3),
1647 existing_rows_count: Some(0),
1648 deleted_rows_count: Some(0),
1649 partitions: Some(
1650 vec![FieldSummary { contains_null: false, contains_nan: Some(false), lower_bound: Some(Datum::long(1).to_bytes().unwrap()), upper_bound: Some(Datum::long(1).to_bytes().unwrap())}]
1651 ),
1652 key_metadata: None,
1653 first_row_id: Some(10),
1654 }]
1655 }.try_into().unwrap();
1656 let result = serde_json::to_string(&manifest_list).unwrap();
1657 assert_eq!(
1658 result,
1659 r#"[{"manifest_path":"s3a://icebergdata/demo/s1/t1/metadata/05ffe08b-810f-49b3-a8f4-e88fc99b254a-m0.avro","manifest_length":6926,"partition_spec_id":1,"content":0,"sequence_number":1,"min_sequence_number":1,"added_snapshot_id":377075049360453639,"added_files_count":1,"existing_files_count":0,"deleted_files_count":0,"added_rows_count":3,"existing_rows_count":0,"deleted_rows_count":0,"partitions":[{"contains_null":false,"contains_nan":false,"lower_bound":[1,0,0,0,0,0,0,0],"upper_bound":[1,0,0,0,0,0,0,0]}],"key_metadata":null,"first_row_id":10}]"#
1660 );
1661 }
1662
1663 #[tokio::test]
1664 async fn test_manifest_list_writer_v1() {
1665 let expected_manifest_list = ManifestList {
1666 entries: vec![ManifestFile {
1667 manifest_path: "/opt/bitnami/spark/warehouse/db/table/metadata/10d28031-9739-484c-92db-cdf2975cead4-m0.avro".to_string(),
1668 manifest_length: 5806,
1669 partition_spec_id: 1,
1670 content: ManifestContentType::Data,
1671 sequence_number: 0,
1672 min_sequence_number: 0,
1673 added_snapshot_id: 1646658105718557341,
1674 added_files_count: Some(3),
1675 existing_files_count: Some(0),
1676 deleted_files_count: Some(0),
1677 added_rows_count: Some(3),
1678 existing_rows_count: Some(0),
1679 deleted_rows_count: Some(0),
1680 partitions: Some(
1681 vec![FieldSummary { contains_null: false, contains_nan: Some(false), lower_bound: Some(Datum::long(1).to_bytes().unwrap()), upper_bound: Some(Datum::long(1).to_bytes().unwrap())}],
1682 ),
1683 key_metadata: None,
1684 first_row_id: None,
1685 }]
1686 };
1687
1688 let temp_dir = TempDir::new().unwrap();
1689 let path = temp_dir.path().join("manifest_list_v1.avro");
1690 let io = FileIOBuilder::new_fs_io().build().unwrap();
1691 let output_file = io.new_output(path.to_str().unwrap()).unwrap();
1692
1693 let mut writer = ManifestListWriter::v1(output_file, 1646658105718557341, Some(0));
1694 writer
1695 .add_manifests(expected_manifest_list.entries.clone().into_iter())
1696 .unwrap();
1697 writer.close().await.unwrap();
1698
1699 let bs = fs::read(path).unwrap();
1700
1701 let manifest_list =
1702 ManifestList::parse_with_version(&bs, crate::spec::FormatVersion::V1).unwrap();
1703 assert_eq!(manifest_list, expected_manifest_list);
1704
1705 temp_dir.close().unwrap();
1706 }
1707
1708 #[tokio::test]
1709 async fn test_manifest_list_writer_v2() {
1710 let snapshot_id = 377075049360453639;
1711 let seq_num = 1;
1712 let mut expected_manifest_list = ManifestList {
1713 entries: vec![ManifestFile {
1714 manifest_path: "s3a://icebergdata/demo/s1/t1/metadata/05ffe08b-810f-49b3-a8f4-e88fc99b254a-m0.avro".to_string(),
1715 manifest_length: 6926,
1716 partition_spec_id: 1,
1717 content: ManifestContentType::Data,
1718 sequence_number: UNASSIGNED_SEQUENCE_NUMBER,
1719 min_sequence_number: UNASSIGNED_SEQUENCE_NUMBER,
1720 added_snapshot_id: snapshot_id,
1721 added_files_count: Some(1),
1722 existing_files_count: Some(0),
1723 deleted_files_count: Some(0),
1724 added_rows_count: Some(3),
1725 existing_rows_count: Some(0),
1726 deleted_rows_count: Some(0),
1727 partitions: Some(
1728 vec![FieldSummary { contains_null: false, contains_nan: Some(false), lower_bound: Some(Datum::long(1).to_bytes().unwrap()), upper_bound: Some(Datum::long(1).to_bytes().unwrap())}]
1729 ),
1730 key_metadata: None,
1731 first_row_id: None,
1732 }]
1733 };
1734
1735 let temp_dir = TempDir::new().unwrap();
1736 let path = temp_dir.path().join("manifest_list_v2.avro");
1737 let io = FileIOBuilder::new_fs_io().build().unwrap();
1738 let output_file = io.new_output(path.to_str().unwrap()).unwrap();
1739
1740 let mut writer = ManifestListWriter::v2(output_file, snapshot_id, Some(0), seq_num);
1741 writer
1742 .add_manifests(expected_manifest_list.entries.clone().into_iter())
1743 .unwrap();
1744 writer.close().await.unwrap();
1745
1746 let bs = fs::read(path).unwrap();
1747 let manifest_list =
1748 ManifestList::parse_with_version(&bs, crate::spec::FormatVersion::V2).unwrap();
1749 expected_manifest_list.entries[0].sequence_number = seq_num;
1750 expected_manifest_list.entries[0].min_sequence_number = seq_num;
1751 assert_eq!(manifest_list, expected_manifest_list);
1752
1753 temp_dir.close().unwrap();
1754 }
1755
1756 #[tokio::test]
1757 async fn test_manifest_list_writer_v3() {
1758 let snapshot_id = 377075049360453639;
1759 let seq_num = 1;
1760 let mut expected_manifest_list = ManifestList {
1761 entries: vec![ManifestFile {
1762 manifest_path: "s3a://icebergdata/demo/s1/t1/metadata/05ffe08b-810f-49b3-a8f4-e88fc99b254a-m0.avro".to_string(),
1763 manifest_length: 6926,
1764 partition_spec_id: 1,
1765 content: ManifestContentType::Data,
1766 sequence_number: UNASSIGNED_SEQUENCE_NUMBER,
1767 min_sequence_number: UNASSIGNED_SEQUENCE_NUMBER,
1768 added_snapshot_id: snapshot_id,
1769 added_files_count: Some(1),
1770 existing_files_count: Some(0),
1771 deleted_files_count: Some(0),
1772 added_rows_count: Some(3),
1773 existing_rows_count: Some(0),
1774 deleted_rows_count: Some(0),
1775 partitions: Some(
1776 vec![FieldSummary { contains_null: false, contains_nan: Some(false), lower_bound: Some(Datum::long(1).to_bytes().unwrap()), upper_bound: Some(Datum::long(1).to_bytes().unwrap())}]
1777 ),
1778 key_metadata: None,
1779 first_row_id: Some(10),
1780 }]
1781 };
1782
1783 let temp_dir = TempDir::new().unwrap();
1784 let path = temp_dir.path().join("manifest_list_v2.avro");
1785 let io = FileIOBuilder::new_fs_io().build().unwrap();
1786 let output_file = io.new_output(path.to_str().unwrap()).unwrap();
1787
1788 let mut writer =
1789 ManifestListWriter::v3(output_file, snapshot_id, Some(0), seq_num, Some(10));
1790 writer
1791 .add_manifests(expected_manifest_list.entries.clone().into_iter())
1792 .unwrap();
1793 writer.close().await.unwrap();
1794
1795 let bs = fs::read(path).unwrap();
1796 let manifest_list =
1797 ManifestList::parse_with_version(&bs, crate::spec::FormatVersion::V3).unwrap();
1798 expected_manifest_list.entries[0].sequence_number = seq_num;
1799 expected_manifest_list.entries[0].min_sequence_number = seq_num;
1800 expected_manifest_list.entries[0].first_row_id = Some(10);
1801 assert_eq!(manifest_list, expected_manifest_list);
1802
1803 temp_dir.close().unwrap();
1804 }
1805
1806 #[tokio::test]
1807 async fn test_manifest_list_writer_v1_as_v2() {
1808 let expected_manifest_list = ManifestList {
1809 entries: vec![ManifestFile {
1810 manifest_path: "/opt/bitnami/spark/warehouse/db/table/metadata/10d28031-9739-484c-92db-cdf2975cead4-m0.avro".to_string(),
1811 manifest_length: 5806,
1812 partition_spec_id: 1,
1813 content: ManifestContentType::Data,
1814 sequence_number: 0,
1815 min_sequence_number: 0,
1816 added_snapshot_id: 1646658105718557341,
1817 added_files_count: Some(3),
1818 existing_files_count: Some(0),
1819 deleted_files_count: Some(0),
1820 added_rows_count: Some(3),
1821 existing_rows_count: Some(0),
1822 deleted_rows_count: Some(0),
1823 partitions: Some(
1824 vec![FieldSummary { contains_null: false, contains_nan: Some(false), lower_bound: Some(Datum::long(1).to_bytes().unwrap()), upper_bound: Some(Datum::long(1).to_bytes().unwrap())}]
1825 ),
1826 key_metadata: None,
1827 first_row_id: None,
1828 }]
1829 };
1830
1831 let temp_dir = TempDir::new().unwrap();
1832 let path = temp_dir.path().join("manifest_list_v1.avro");
1833 let io = FileIOBuilder::new_fs_io().build().unwrap();
1834 let output_file = io.new_output(path.to_str().unwrap()).unwrap();
1835
1836 let mut writer = ManifestListWriter::v1(output_file, 1646658105718557341, Some(0));
1837 writer
1838 .add_manifests(expected_manifest_list.entries.clone().into_iter())
1839 .unwrap();
1840 writer.close().await.unwrap();
1841
1842 let bs = fs::read(path).unwrap();
1843
1844 let manifest_list =
1845 ManifestList::parse_with_version(&bs, crate::spec::FormatVersion::V2).unwrap();
1846 assert_eq!(manifest_list, expected_manifest_list);
1847
1848 temp_dir.close().unwrap();
1849 }
1850
1851 #[tokio::test]
1852 async fn test_manifest_list_writer_v1_as_v3() {
1853 let expected_manifest_list = ManifestList {
1854 entries: vec![ManifestFile {
1855 manifest_path: "/opt/bitnami/spark/warehouse/db/table/metadata/10d28031-9739-484c-92db-cdf2975cead4-m0.avro".to_string(),
1856 manifest_length: 5806,
1857 partition_spec_id: 1,
1858 content: ManifestContentType::Data,
1859 sequence_number: 0,
1860 min_sequence_number: 0,
1861 added_snapshot_id: 1646658105718557341,
1862 added_files_count: Some(3),
1863 existing_files_count: Some(0),
1864 deleted_files_count: Some(0),
1865 added_rows_count: Some(3),
1866 existing_rows_count: Some(0),
1867 deleted_rows_count: Some(0),
1868 partitions: Some(
1869 vec![FieldSummary { contains_null: false, contains_nan: Some(false), lower_bound: Some(Datum::long(1).to_bytes().unwrap()), upper_bound: Some(Datum::long(1).to_bytes().unwrap())}]
1870 ),
1871 key_metadata: None,
1872 first_row_id: None,
1873 }]
1874 };
1875
1876 let temp_dir = TempDir::new().unwrap();
1877 let path = temp_dir.path().join("manifest_list_v1.avro");
1878 let io = FileIOBuilder::new_fs_io().build().unwrap();
1879 let output_file = io.new_output(path.to_str().unwrap()).unwrap();
1880
1881 let mut writer = ManifestListWriter::v1(output_file, 1646658105718557341, Some(0));
1882 writer
1883 .add_manifests(expected_manifest_list.entries.clone().into_iter())
1884 .unwrap();
1885 writer.close().await.unwrap();
1886
1887 let bs = fs::read(path).unwrap();
1888
1889 let manifest_list =
1890 ManifestList::parse_with_version(&bs, crate::spec::FormatVersion::V3).unwrap();
1891 assert_eq!(manifest_list, expected_manifest_list);
1892
1893 temp_dir.close().unwrap();
1894 }
1895
1896 #[tokio::test]
1897 async fn test_manifest_list_writer_v2_as_v3() {
1898 let snapshot_id = 377075049360453639;
1899 let seq_num = 1;
1900 let mut expected_manifest_list = ManifestList {
1901 entries: vec![ManifestFile {
1902 manifest_path: "s3a://icebergdata/demo/s1/t1/metadata/05ffe08b-810f-49b3-a8f4-e88fc99b254a-m0.avro".to_string(),
1903 manifest_length: 6926,
1904 partition_spec_id: 1,
1905 content: ManifestContentType::Data,
1906 sequence_number: UNASSIGNED_SEQUENCE_NUMBER,
1907 min_sequence_number: UNASSIGNED_SEQUENCE_NUMBER,
1908 added_snapshot_id: snapshot_id,
1909 added_files_count: Some(1),
1910 existing_files_count: Some(0),
1911 deleted_files_count: Some(0),
1912 added_rows_count: Some(3),
1913 existing_rows_count: Some(0),
1914 deleted_rows_count: Some(0),
1915 partitions: Some(
1916 vec![FieldSummary { contains_null: false, contains_nan: Some(false), lower_bound: Some(Datum::long(1).to_bytes().unwrap()), upper_bound: Some(Datum::long(1).to_bytes().unwrap())}]
1917 ),
1918 key_metadata: None,
1919 first_row_id: None,
1920 }]
1921 };
1922
1923 let temp_dir = TempDir::new().unwrap();
1924 let path = temp_dir.path().join("manifest_list_v2.avro");
1925 let io = FileIOBuilder::new_fs_io().build().unwrap();
1926 let output_file = io.new_output(path.to_str().unwrap()).unwrap();
1927
1928 let mut writer = ManifestListWriter::v2(output_file, snapshot_id, Some(0), seq_num);
1929 writer
1930 .add_manifests(expected_manifest_list.entries.clone().into_iter())
1931 .unwrap();
1932 writer.close().await.unwrap();
1933
1934 let bs = fs::read(path).unwrap();
1935
1936 let manifest_list =
1937 ManifestList::parse_with_version(&bs, crate::spec::FormatVersion::V3).unwrap();
1938 expected_manifest_list.entries[0].sequence_number = seq_num;
1939 expected_manifest_list.entries[0].min_sequence_number = seq_num;
1940 assert_eq!(manifest_list, expected_manifest_list);
1941
1942 temp_dir.close().unwrap();
1943 }
1944
1945 #[tokio::test]
1946 async fn test_manifest_list_v2_deserializer_aliases() {
1947 let avro_1_path = "testdata/manifests_lists/manifest-list-v2-1.avro";
1949 let bs_1 = fs::read(avro_1_path).unwrap();
1950 let avro_1_fields = read_avro_schema_fields_as_str(bs_1.clone()).await;
1951 assert_eq!(
1952 avro_1_fields,
1953 "manifest_path, manifest_length, partition_spec_id, content, sequence_number, min_sequence_number, added_snapshot_id, added_data_files_count, existing_data_files_count, deleted_data_files_count, added_rows_count, existing_rows_count, deleted_rows_count, partitions"
1954 );
1955 let avro_2_path = "testdata/manifests_lists/manifest-list-v2-2.avro";
1957 let bs_2 = fs::read(avro_2_path).unwrap();
1958 let avro_2_fields = read_avro_schema_fields_as_str(bs_2.clone()).await;
1959 assert_eq!(
1960 avro_2_fields,
1961 "manifest_path, manifest_length, partition_spec_id, content, sequence_number, min_sequence_number, added_snapshot_id, added_files_count, existing_files_count, deleted_files_count, added_rows_count, existing_rows_count, deleted_rows_count, partitions"
1962 );
1963 let _manifest_list_1 =
1965 ManifestList::parse_with_version(&bs_1, crate::spec::FormatVersion::V2).unwrap();
1966 let _manifest_list_2 =
1967 ManifestList::parse_with_version(&bs_2, crate::spec::FormatVersion::V2).unwrap();
1968 }
1969
1970 async fn read_avro_schema_fields_as_str(bs: Vec<u8>) -> String {
1971 let reader = Reader::new(&bs[..]).unwrap();
1972 let schema = reader.writer_schema();
1973 let fields: String = match schema {
1974 Schema::Record(record) => record
1975 .fields
1976 .iter()
1977 .map(|field| field.name.clone())
1978 .collect::<Vec<String>>()
1979 .join(", "),
1980 _ => "".to_string(),
1981 };
1982 fields
1983 }
1984
1985 #[test]
1986 fn test_manifest_content_type_default() {
1987 assert_eq!(ManifestContentType::default(), ManifestContentType::Data);
1988 }
1989
1990 #[test]
1991 fn test_manifest_content_type_default_value() {
1992 assert_eq!(ManifestContentType::default() as i32, 0);
1993 }
1994
1995 #[test]
1996 fn test_manifest_file_v1_to_v2_projection() {
1997 use crate::spec::manifest_list::_serde::ManifestFileV1;
1998
1999 let v1_manifest = ManifestFileV1 {
2001 manifest_path: "/test/manifest.avro".to_string(),
2002 manifest_length: 5806,
2003 partition_spec_id: 0,
2004 added_snapshot_id: 1646658105718557341,
2005 added_data_files_count: Some(3),
2006 existing_data_files_count: Some(0),
2007 deleted_data_files_count: Some(0),
2008 added_rows_count: Some(3),
2009 existing_rows_count: Some(0),
2010 deleted_rows_count: Some(0),
2011 partitions: None,
2012 key_metadata: None,
2013 };
2014
2015 let v2_manifest: ManifestFile = v1_manifest.try_into().unwrap();
2017
2018 assert_eq!(
2020 v2_manifest.content,
2021 ManifestContentType::Data,
2022 "V1 manifest content should default to Data (0)"
2023 );
2024 assert_eq!(
2025 v2_manifest.sequence_number, 0,
2026 "V1 manifest sequence_number should default to 0"
2027 );
2028 assert_eq!(
2029 v2_manifest.min_sequence_number, 0,
2030 "V1 manifest min_sequence_number should default to 0"
2031 );
2032
2033 assert_eq!(v2_manifest.manifest_path, "/test/manifest.avro");
2035 assert_eq!(v2_manifest.manifest_length, 5806);
2036 assert_eq!(v2_manifest.partition_spec_id, 0);
2037 assert_eq!(v2_manifest.added_snapshot_id, 1646658105718557341);
2038 assert_eq!(v2_manifest.added_files_count, Some(3));
2039 assert_eq!(v2_manifest.existing_files_count, Some(0));
2040 assert_eq!(v2_manifest.deleted_files_count, Some(0));
2041 assert_eq!(v2_manifest.added_rows_count, Some(3));
2042 assert_eq!(v2_manifest.existing_rows_count, Some(0));
2043 assert_eq!(v2_manifest.deleted_rows_count, Some(0));
2044 assert_eq!(v2_manifest.partitions, None);
2045 assert_eq!(v2_manifest.key_metadata, None);
2046 }
2047}