1use std::cmp::min;
19
20use apache_avro::{Writer as AvroWriter, to_value};
21use bytes::Bytes;
22use itertools::Itertools;
23use serde_json::to_vec;
24
25use super::{
26 Datum, FormatVersion, ManifestContentType, PartitionSpec, PrimitiveType,
27 UNASSIGNED_SEQUENCE_NUMBER,
28};
29use crate::error::Result;
30use crate::io::OutputFile;
31use crate::spec::manifest::_serde::{ManifestEntryV1, ManifestEntryV2};
32use crate::spec::manifest::{manifest_schema_v1, manifest_schema_v2};
33use crate::spec::{
34 DataContentType, DataFile, FieldSummary, ManifestEntry, ManifestFile, ManifestMetadata,
35 ManifestStatus, PrimitiveLiteral, SchemaRef, StructType, UNASSIGNED_SNAPSHOT_ID,
36};
37use crate::{Error, ErrorKind};
38
39pub struct ManifestWriterBuilder {
41 output: OutputFile,
42 snapshot_id: Option<i64>,
43 key_metadata: Option<Vec<u8>>,
44 schema: SchemaRef,
45 partition_spec: PartitionSpec,
46}
47
48impl ManifestWriterBuilder {
49 pub fn new(
51 output: OutputFile,
52 snapshot_id: Option<i64>,
53 key_metadata: Option<Vec<u8>>,
54 schema: SchemaRef,
55 partition_spec: PartitionSpec,
56 ) -> Self {
57 Self {
58 output,
59 snapshot_id,
60 key_metadata,
61 schema,
62 partition_spec,
63 }
64 }
65
66 pub fn build_v1(self) -> ManifestWriter {
68 let metadata = ManifestMetadata::builder()
69 .schema_id(self.schema.schema_id())
70 .schema(self.schema)
71 .partition_spec(self.partition_spec)
72 .format_version(FormatVersion::V1)
73 .content(ManifestContentType::Data)
74 .build();
75 ManifestWriter::new(
76 self.output,
77 self.snapshot_id,
78 self.key_metadata,
79 metadata,
80 None,
81 )
82 }
83
84 pub fn build_v2_data(self) -> ManifestWriter {
86 let metadata = ManifestMetadata::builder()
87 .schema_id(self.schema.schema_id())
88 .schema(self.schema)
89 .partition_spec(self.partition_spec)
90 .format_version(FormatVersion::V2)
91 .content(ManifestContentType::Data)
92 .build();
93 ManifestWriter::new(
94 self.output,
95 self.snapshot_id,
96 self.key_metadata,
97 metadata,
98 None,
99 )
100 }
101
102 pub fn build_v2_deletes(self) -> ManifestWriter {
104 let metadata = ManifestMetadata::builder()
105 .schema_id(self.schema.schema_id())
106 .schema(self.schema)
107 .partition_spec(self.partition_spec)
108 .format_version(FormatVersion::V2)
109 .content(ManifestContentType::Deletes)
110 .build();
111 ManifestWriter::new(
112 self.output,
113 self.snapshot_id,
114 self.key_metadata,
115 metadata,
116 None,
117 )
118 }
119
120 pub fn build_v3_data(self) -> ManifestWriter {
122 let metadata = ManifestMetadata::builder()
123 .schema_id(self.schema.schema_id())
124 .schema(self.schema)
125 .partition_spec(self.partition_spec)
126 .format_version(FormatVersion::V3)
127 .content(ManifestContentType::Data)
128 .build();
129 ManifestWriter::new(
130 self.output,
131 self.snapshot_id,
132 self.key_metadata,
133 metadata,
134 None,
137 )
138 }
139
140 pub fn build_v3_deletes(self) -> ManifestWriter {
142 let metadata = ManifestMetadata::builder()
143 .schema_id(self.schema.schema_id())
144 .schema(self.schema)
145 .partition_spec(self.partition_spec)
146 .format_version(FormatVersion::V3)
147 .content(ManifestContentType::Deletes)
148 .build();
149 ManifestWriter::new(
150 self.output,
151 self.snapshot_id,
152 self.key_metadata,
153 metadata,
154 None,
155 )
156 }
157}
158
159pub struct ManifestWriter {
161 output: OutputFile,
162
163 snapshot_id: Option<i64>,
164
165 added_files: u32,
166 added_rows: u64,
167 existing_files: u32,
168 existing_rows: u64,
169 deleted_files: u32,
170 deleted_rows: u64,
171 first_row_id: Option<u64>,
172
173 min_seq_num: Option<i64>,
174
175 key_metadata: Option<Vec<u8>>,
176
177 manifest_entries: Vec<ManifestEntry>,
178
179 metadata: ManifestMetadata,
180}
181
182impl ManifestWriter {
183 pub(crate) fn new(
185 output: OutputFile,
186 snapshot_id: Option<i64>,
187 key_metadata: Option<Vec<u8>>,
188 metadata: ManifestMetadata,
189 first_row_id: Option<u64>,
190 ) -> Self {
191 Self {
192 output,
193 snapshot_id,
194 added_files: 0,
195 added_rows: 0,
196 existing_files: 0,
197 existing_rows: 0,
198 deleted_files: 0,
199 deleted_rows: 0,
200 first_row_id,
201 min_seq_num: None,
202 key_metadata,
203 manifest_entries: Vec::new(),
204 metadata,
205 }
206 }
207
208 fn construct_partition_summaries(
209 &mut self,
210 partition_type: &StructType,
211 ) -> Result<Vec<FieldSummary>> {
212 let mut field_stats: Vec<_> = partition_type
213 .fields()
214 .iter()
215 .map(|f| PartitionFieldStats::new(f.field_type.as_primitive_type().unwrap().clone()))
216 .collect();
217 for partition in self.manifest_entries.iter().map(|e| &e.data_file.partition) {
218 for (literal, stat) in partition.iter().zip_eq(field_stats.iter_mut()) {
219 let primitive_literal = literal.map(|v| v.as_primitive_literal().unwrap());
220 stat.update(primitive_literal)?;
221 }
222 }
223 Ok(field_stats.into_iter().map(|stat| stat.finish()).collect())
224 }
225
226 fn check_data_file(&self, data_file: &DataFile) -> Result<()> {
227 match self.metadata.content {
228 ManifestContentType::Data => {
229 if data_file.content != DataContentType::Data {
230 return Err(Error::new(
231 ErrorKind::DataInvalid,
232 format!(
233 "Date file at path {} with manifest content type `data`, should have DataContentType `Data`, but has `{:?}`",
234 data_file.file_path(),
235 data_file.content
236 ),
237 ));
238 }
239 }
240 ManifestContentType::Deletes => {
241 if data_file.content != DataContentType::EqualityDeletes
242 && data_file.content != DataContentType::PositionDeletes
243 {
244 return Err(Error::new(
245 ErrorKind::DataInvalid,
246 format!(
247 "Date file at path {} with manifest content type `deletes`, should have DataContentType `Data`, but has `{:?}`",
248 data_file.file_path(),
249 data_file.content
250 ),
251 ));
252 }
253 }
254 }
255 Ok(())
256 }
257
258 pub(crate) fn add_entry(&mut self, mut entry: ManifestEntry) -> Result<()> {
264 self.check_data_file(&entry.data_file)?;
265 if entry.sequence_number().is_some_and(|n| n >= 0) {
266 entry.status = ManifestStatus::Added;
267 entry.snapshot_id = self.snapshot_id;
268 entry.file_sequence_number = None;
269 } else {
270 entry.status = ManifestStatus::Added;
271 entry.snapshot_id = self.snapshot_id;
272 entry.sequence_number = None;
273 entry.file_sequence_number = None;
274 };
275 self.add_entry_inner(entry)?;
276 Ok(())
277 }
278
279 pub fn add_file(&mut self, data_file: DataFile, sequence_number: i64) -> Result<()> {
283 self.check_data_file(&data_file)?;
284 let entry = ManifestEntry {
285 status: ManifestStatus::Added,
286 snapshot_id: self.snapshot_id,
287 sequence_number: (sequence_number >= 0).then_some(sequence_number),
288 file_sequence_number: None,
289 data_file,
290 };
291 self.add_entry_inner(entry)?;
292 Ok(())
293 }
294
295 #[allow(dead_code)]
302 pub(crate) fn add_delete_entry(&mut self, mut entry: ManifestEntry) -> Result<()> {
303 self.check_data_file(&entry.data_file)?;
304 entry.status = ManifestStatus::Deleted;
305 entry.snapshot_id = self.snapshot_id;
306 self.add_entry_inner(entry)?;
307 Ok(())
308 }
309
310 pub fn add_delete_file(
314 &mut self,
315 data_file: DataFile,
316 sequence_number: i64,
317 file_sequence_number: Option<i64>,
318 ) -> Result<()> {
319 self.check_data_file(&data_file)?;
320 let entry = ManifestEntry {
321 status: ManifestStatus::Deleted,
322 snapshot_id: self.snapshot_id,
323 sequence_number: Some(sequence_number),
324 file_sequence_number,
325 data_file,
326 };
327 self.add_entry_inner(entry)?;
328 Ok(())
329 }
330
331 #[allow(dead_code)]
337 pub(crate) fn add_existing_entry(&mut self, mut entry: ManifestEntry) -> Result<()> {
338 self.check_data_file(&entry.data_file)?;
339 entry.status = ManifestStatus::Existing;
340 self.add_entry_inner(entry)?;
341 Ok(())
342 }
343
344 pub fn add_existing_file(
347 &mut self,
348 data_file: DataFile,
349 snapshot_id: i64,
350 sequence_number: i64,
351 file_sequence_number: Option<i64>,
352 ) -> Result<()> {
353 self.check_data_file(&data_file)?;
354 let entry = ManifestEntry {
355 status: ManifestStatus::Existing,
356 snapshot_id: Some(snapshot_id),
357 sequence_number: Some(sequence_number),
358 file_sequence_number,
359 data_file,
360 };
361 self.add_entry_inner(entry)?;
362 Ok(())
363 }
364
365 fn add_entry_inner(&mut self, entry: ManifestEntry) -> Result<()> {
366 if (entry.status == ManifestStatus::Deleted || entry.status == ManifestStatus::Existing)
368 && (entry.sequence_number.is_none() || entry.file_sequence_number.is_none())
369 {
370 return Err(Error::new(
371 ErrorKind::DataInvalid,
372 "Manifest entry with status Existing or Deleted should have sequence number",
373 ));
374 }
375
376 match entry.status {
378 ManifestStatus::Added => {
379 self.added_files += 1;
380 self.added_rows += entry.data_file.record_count;
381 }
382 ManifestStatus::Deleted => {
383 self.deleted_files += 1;
384 self.deleted_rows += entry.data_file.record_count;
385 }
386 ManifestStatus::Existing => {
387 self.existing_files += 1;
388 self.existing_rows += entry.data_file.record_count;
389 }
390 }
391 if entry.is_alive() {
392 if let Some(seq_num) = entry.sequence_number {
393 self.min_seq_num = Some(self.min_seq_num.map_or(seq_num, |v| min(v, seq_num)));
394 }
395 }
396 self.manifest_entries.push(entry);
397 Ok(())
398 }
399
400 pub async fn write_manifest_file(mut self) -> Result<ManifestFile> {
402 let partition_type = self
404 .metadata
405 .partition_spec
406 .partition_type(&self.metadata.schema)?;
407 let table_schema = &self.metadata.schema;
408 let avro_schema = match self.metadata.format_version {
409 FormatVersion::V1 => manifest_schema_v1(&partition_type)?,
410 FormatVersion::V2 | FormatVersion::V3 => manifest_schema_v2(&partition_type)?,
412 };
413 let mut avro_writer = AvroWriter::new(&avro_schema, Vec::new());
414 avro_writer.add_user_metadata(
415 "schema".to_string(),
416 to_vec(table_schema).map_err(|err| {
417 Error::new(ErrorKind::DataInvalid, "Fail to serialize table schema")
418 .with_source(err)
419 })?,
420 )?;
421 avro_writer.add_user_metadata(
422 "schema-id".to_string(),
423 table_schema.schema_id().to_string(),
424 )?;
425 avro_writer.add_user_metadata(
426 "partition-spec".to_string(),
427 to_vec(&self.metadata.partition_spec.fields()).map_err(|err| {
428 Error::new(ErrorKind::DataInvalid, "Fail to serialize partition spec")
429 .with_source(err)
430 })?,
431 )?;
432 avro_writer.add_user_metadata(
433 "partition-spec-id".to_string(),
434 self.metadata.partition_spec.spec_id().to_string(),
435 )?;
436 avro_writer.add_user_metadata(
437 "format-version".to_string(),
438 (self.metadata.format_version as u8).to_string(),
439 )?;
440 if self.metadata.format_version == FormatVersion::V2 {
441 avro_writer
442 .add_user_metadata("content".to_string(), self.metadata.content.to_string())?;
443 }
444
445 let partition_summary = self.construct_partition_summaries(&partition_type)?;
446 for entry in std::mem::take(&mut self.manifest_entries) {
448 let value = match self.metadata.format_version {
449 FormatVersion::V1 => to_value(ManifestEntryV1::try_from(entry, &partition_type)?)?
450 .resolve(&avro_schema)?,
451 FormatVersion::V2 | FormatVersion::V3 => {
453 to_value(ManifestEntryV2::try_from(entry, &partition_type)?)?
454 .resolve(&avro_schema)?
455 }
456 };
457
458 avro_writer.append(value)?;
459 }
460
461 let content = avro_writer.into_inner()?;
462 let length = content.len();
463 self.output.write(Bytes::from(content)).await?;
464
465 Ok(ManifestFile {
466 manifest_path: self.output.location().to_string(),
467 manifest_length: length as i64,
468 partition_spec_id: self.metadata.partition_spec.spec_id(),
469 content: self.metadata.content,
470 sequence_number: UNASSIGNED_SEQUENCE_NUMBER,
473 min_sequence_number: self.min_seq_num.unwrap_or(UNASSIGNED_SEQUENCE_NUMBER),
474 added_snapshot_id: self.snapshot_id.unwrap_or(UNASSIGNED_SNAPSHOT_ID),
475 added_files_count: Some(self.added_files),
476 existing_files_count: Some(self.existing_files),
477 deleted_files_count: Some(self.deleted_files),
478 added_rows_count: Some(self.added_rows),
479 existing_rows_count: Some(self.existing_rows),
480 deleted_rows_count: Some(self.deleted_rows),
481 partitions: Some(partition_summary),
482 key_metadata: self.key_metadata,
483 first_row_id: self.first_row_id,
484 })
485 }
486}
487
488struct PartitionFieldStats {
489 partition_type: PrimitiveType,
490
491 contains_null: bool,
492 contains_nan: Option<bool>,
493 lower_bound: Option<Datum>,
494 upper_bound: Option<Datum>,
495}
496
497impl PartitionFieldStats {
498 pub(crate) fn new(partition_type: PrimitiveType) -> Self {
499 Self {
500 partition_type,
501 contains_null: false,
502 contains_nan: Some(false),
503 upper_bound: None,
504 lower_bound: None,
505 }
506 }
507
508 pub(crate) fn update(&mut self, value: Option<PrimitiveLiteral>) -> Result<()> {
509 let Some(value) = value else {
510 self.contains_null = true;
511 return Ok(());
512 };
513 if !self.partition_type.compatible(&value) {
514 return Err(Error::new(
515 ErrorKind::DataInvalid,
516 "value is not compatible with type",
517 ));
518 }
519 let value = Datum::new(self.partition_type.clone(), value);
520
521 if value.is_nan() {
522 self.contains_nan = Some(true);
523 return Ok(());
524 }
525
526 self.lower_bound = Some(self.lower_bound.take().map_or(value.clone(), |original| {
527 if value < original {
528 value.clone()
529 } else {
530 original
531 }
532 }));
533 self.upper_bound = Some(self.upper_bound.take().map_or(value.clone(), |original| {
534 if value > original { value } else { original }
535 }));
536
537 Ok(())
538 }
539
540 pub(crate) fn finish(self) -> FieldSummary {
541 FieldSummary {
542 contains_null: self.contains_null,
543 contains_nan: self.contains_nan,
544 upper_bound: self.upper_bound.map(|v| v.to_bytes().unwrap()),
545 lower_bound: self.lower_bound.map(|v| v.to_bytes().unwrap()),
546 }
547 }
548}
549
550#[cfg(test)]
551mod tests {
552 use std::collections::HashMap;
553 use std::fs;
554 use std::sync::Arc;
555
556 use tempfile::TempDir;
557
558 use super::*;
559 use crate::io::FileIOBuilder;
560 use crate::spec::{DataFileFormat, Manifest, NestedField, PrimitiveType, Schema, Struct, Type};
561
562 #[tokio::test]
563 async fn test_add_delete_existing() {
564 let schema = Arc::new(
565 Schema::builder()
566 .with_fields(vec![
567 Arc::new(NestedField::optional(
568 1,
569 "id",
570 Type::Primitive(PrimitiveType::Int),
571 )),
572 Arc::new(NestedField::optional(
573 2,
574 "name",
575 Type::Primitive(PrimitiveType::String),
576 )),
577 ])
578 .build()
579 .unwrap(),
580 );
581 let metadata = ManifestMetadata {
582 schema_id: 0,
583 schema: schema.clone(),
584 partition_spec: PartitionSpec::builder(schema)
585 .with_spec_id(0)
586 .build()
587 .unwrap(),
588 content: ManifestContentType::Data,
589 format_version: FormatVersion::V2,
590 };
591 let mut entries = vec![
592 ManifestEntry {
593 status: ManifestStatus::Added,
594 snapshot_id: None,
595 sequence_number: Some(1),
596 file_sequence_number: Some(1),
597 data_file: DataFile {
598 content: DataContentType::Data,
599 file_path: "s3a://icebergdata/demo/s1/t1/data/00000-0-ba56fbfa-f2ff-40c9-bb27-565ad6dc2be8-00000.parquet".to_string(),
600 file_format: DataFileFormat::Parquet,
601 partition: Struct::empty(),
602 record_count: 1,
603 file_size_in_bytes: 5442,
604 column_sizes: HashMap::from([(1, 61), (2, 73)]),
605 value_counts: HashMap::from([(1, 1), (2, 1)]),
606 null_value_counts: HashMap::from([(1, 0), (2, 0)]),
607 nan_value_counts: HashMap::new(),
608 lower_bounds: HashMap::new(),
609 upper_bounds: HashMap::new(),
610 key_metadata: Some(Vec::new()),
611 split_offsets: vec![4],
612 equality_ids: None,
613 sort_order_id: None,
614 partition_spec_id: 0,
615 first_row_id: None,
616 referenced_data_file: None,
617 content_offset: None,
618 content_size_in_bytes: None,
619 },
620 },
621 ManifestEntry {
622 status: ManifestStatus::Deleted,
623 snapshot_id: Some(1),
624 sequence_number: Some(1),
625 file_sequence_number: Some(1),
626 data_file: DataFile {
627 content: DataContentType::Data,
628 file_path: "s3a://icebergdata/demo/s1/t1/data/00000-0-ba56fbfa-f2ff-40c9-bb27-565ad6dc2be8-00000.parquet".to_string(),
629 file_format: DataFileFormat::Parquet,
630 partition: Struct::empty(),
631 record_count: 1,
632 file_size_in_bytes: 5442,
633 column_sizes: HashMap::from([(1, 61), (2, 73)]),
634 value_counts: HashMap::from([(1, 1), (2, 1)]),
635 null_value_counts: HashMap::from([(1, 0), (2, 0)]),
636 nan_value_counts: HashMap::new(),
637 lower_bounds: HashMap::new(),
638 upper_bounds: HashMap::new(),
639 key_metadata: Some(Vec::new()),
640 split_offsets: vec![4],
641 equality_ids: None,
642 sort_order_id: None,
643 partition_spec_id: 0,
644 first_row_id: None,
645 referenced_data_file: None,
646 content_offset: None,
647 content_size_in_bytes: None,
648 },
649 },
650 ManifestEntry {
651 status: ManifestStatus::Existing,
652 snapshot_id: Some(1),
653 sequence_number: Some(1),
654 file_sequence_number: Some(1),
655 data_file: DataFile {
656 content: DataContentType::Data,
657 file_path: "s3a://icebergdata/demo/s1/t1/data/00000-0-ba56fbfa-f2ff-40c9-bb27-565ad6dc2be8-00000.parquet".to_string(),
658 file_format: DataFileFormat::Parquet,
659 partition: Struct::empty(),
660 record_count: 1,
661 file_size_in_bytes: 5442,
662 column_sizes: HashMap::from([(1, 61), (2, 73)]),
663 value_counts: HashMap::from([(1, 1), (2, 1)]),
664 null_value_counts: HashMap::from([(1, 0), (2, 0)]),
665 nan_value_counts: HashMap::new(),
666 lower_bounds: HashMap::new(),
667 upper_bounds: HashMap::new(),
668 key_metadata: Some(Vec::new()),
669 split_offsets: vec![4],
670 equality_ids: None,
671 sort_order_id: None,
672 partition_spec_id: 0,
673 first_row_id: None,
674 referenced_data_file: None,
675 content_offset: None,
676 content_size_in_bytes: None,
677 },
678 },
679 ];
680
681 let tmp_dir = TempDir::new().unwrap();
683 let path = tmp_dir.path().join("test_manifest.avro");
684 let io = FileIOBuilder::new_fs_io().build().unwrap();
685 let output_file = io.new_output(path.to_str().unwrap()).unwrap();
686 let mut writer = ManifestWriterBuilder::new(
687 output_file,
688 Some(3),
689 None,
690 metadata.schema.clone(),
691 metadata.partition_spec.clone(),
692 )
693 .build_v2_data();
694 writer.add_entry(entries[0].clone()).unwrap();
695 writer.add_delete_entry(entries[1].clone()).unwrap();
696 writer.add_existing_entry(entries[2].clone()).unwrap();
697 writer.write_manifest_file().await.unwrap();
698
699 let actual_manifest =
701 Manifest::parse_avro(fs::read(path).expect("read_file must succeed").as_slice())
702 .unwrap();
703
704 entries[0].snapshot_id = Some(3);
706 entries[1].snapshot_id = Some(3);
707 entries[0].file_sequence_number = None;
709 assert_eq!(actual_manifest, Manifest::new(metadata, entries));
710 }
711}