1use std::collections::HashMap;
19
20use apache_avro::Writer;
21use bytes::Bytes;
22
23use super::_const_schema::{
24 MANIFEST_LIST_AVRO_SCHEMA_V1, MANIFEST_LIST_AVRO_SCHEMA_V2, MANIFEST_LIST_AVRO_SCHEMA_V3,
25};
26use super::_serde::{ManifestFileV1, ManifestFileV2, ManifestFileV3};
27use super::{FormatVersion, ManifestContentType, ManifestFile, UNASSIGNED_SEQUENCE_NUMBER};
28use crate::error::Result;
29use crate::io::FileWrite;
30use crate::{Error, ErrorKind};
31
32pub struct ManifestListWriter {
34 format_version: FormatVersion,
35 writer: Box<dyn FileWrite>,
36 avro_writer: Writer<'static, Vec<u8>>,
37 sequence_number: i64,
38 snapshot_id: i64,
39 next_row_id: Option<u64>,
40}
41
42impl std::fmt::Debug for ManifestListWriter {
43 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
44 f.debug_struct("ManifestListWriter")
45 .field("format_version", &self.format_version)
46 .field("avro_writer", &self.avro_writer.schema())
47 .finish_non_exhaustive()
48 }
49}
50
51impl ManifestListWriter {
52 pub fn next_row_id(&self) -> Option<u64> {
54 self.next_row_id
55 }
56
57 pub fn v1(
59 writer: Box<dyn FileWrite>,
60 snapshot_id: i64,
61 parent_snapshot_id: Option<i64>,
62 ) -> Self {
63 let mut metadata = HashMap::from_iter([
64 ("snapshot-id".to_string(), snapshot_id.to_string()),
65 ("format-version".to_string(), "1".to_string()),
66 ]);
67 if let Some(parent_snapshot_id) = parent_snapshot_id {
68 metadata.insert(
69 "parent-snapshot-id".to_string(),
70 parent_snapshot_id.to_string(),
71 );
72 }
73 Self::new(FormatVersion::V1, writer, metadata, 0, snapshot_id, None)
74 }
75
76 pub fn v2(
78 writer: Box<dyn FileWrite>,
79 snapshot_id: i64,
80 parent_snapshot_id: Option<i64>,
81 sequence_number: i64,
82 ) -> Self {
83 let mut metadata = HashMap::from_iter([
84 ("snapshot-id".to_string(), snapshot_id.to_string()),
85 ("sequence-number".to_string(), sequence_number.to_string()),
86 ("format-version".to_string(), "2".to_string()),
87 ]);
88 metadata.insert(
89 "parent-snapshot-id".to_string(),
90 parent_snapshot_id
91 .map(|v| v.to_string())
92 .unwrap_or("null".to_string()),
93 );
94 Self::new(
95 FormatVersion::V2,
96 writer,
97 metadata,
98 sequence_number,
99 snapshot_id,
100 None,
101 )
102 }
103
104 pub fn v3(
106 writer: Box<dyn FileWrite>,
107 snapshot_id: i64,
108 parent_snapshot_id: Option<i64>,
109 sequence_number: i64,
110 first_row_id: Option<u64>, ) -> Self {
112 let mut metadata = HashMap::from_iter([
113 ("snapshot-id".to_string(), snapshot_id.to_string()),
114 ("sequence-number".to_string(), sequence_number.to_string()),
115 ("format-version".to_string(), "3".to_string()),
116 ]);
117 metadata.insert(
118 "parent-snapshot-id".to_string(),
119 parent_snapshot_id
120 .map(|v| v.to_string())
121 .unwrap_or("null".to_string()),
122 );
123 metadata.insert(
124 "first-row-id".to_string(),
125 first_row_id
126 .map(|v| v.to_string())
127 .unwrap_or("null".to_string()),
128 );
129 Self::new(
130 FormatVersion::V3,
131 writer,
132 metadata,
133 sequence_number,
134 snapshot_id,
135 first_row_id,
136 )
137 }
138
139 fn new(
140 format_version: FormatVersion,
141 writer: Box<dyn FileWrite>,
142 metadata: HashMap<String, String>,
143 sequence_number: i64,
144 snapshot_id: i64,
145 first_row_id: Option<u64>,
146 ) -> Self {
147 let avro_schema = match format_version {
148 FormatVersion::V1 => &MANIFEST_LIST_AVRO_SCHEMA_V1,
149 FormatVersion::V2 => &MANIFEST_LIST_AVRO_SCHEMA_V2,
150 FormatVersion::V3 => &MANIFEST_LIST_AVRO_SCHEMA_V3,
151 };
152 let mut avro_writer = Writer::new(avro_schema, Vec::new());
153 for (key, value) in metadata {
154 avro_writer
155 .add_user_metadata(key, value)
156 .expect("Avro metadata should be added to the writer before the first record.");
157 }
158 Self {
159 format_version,
160 writer,
161 avro_writer,
162 sequence_number,
163 snapshot_id,
164 next_row_id: first_row_id,
165 }
166 }
167
168 pub fn add_manifests(&mut self, manifests: impl Iterator<Item = ManifestFile>) -> Result<()> {
174 match self.format_version {
175 FormatVersion::V1 => {
176 for manifest in manifests {
177 let manifests: ManifestFileV1 = manifest.try_into()?;
178 self.avro_writer.append_ser(manifests)?;
179 }
180 }
181 FormatVersion::V2 | FormatVersion::V3 => {
182 for mut manifest in manifests {
183 self.assign_sequence_numbers(&mut manifest)?;
184
185 if self.format_version == FormatVersion::V2 {
186 let manifest_entry: ManifestFileV2 = manifest.try_into()?;
187 self.avro_writer.append_ser(manifest_entry)?;
188 } else if self.format_version == FormatVersion::V3 {
189 self.assign_first_row_id(&mut manifest)?;
190 let manifest_entry: ManifestFileV3 = manifest.try_into()?;
191 self.avro_writer.append_ser(manifest_entry)?;
192 }
193 }
194 }
195 }
196 Ok(())
197 }
198
199 pub async fn close(mut self) -> Result<()> {
201 let data = self.avro_writer.into_inner()?;
202 self.writer.write(Bytes::from(data)).await?;
203 self.writer.close().await?;
204 Ok(())
205 }
206
207 fn assign_sequence_numbers(&self, manifest: &mut ManifestFile) -> Result<()> {
209 if manifest.sequence_number == UNASSIGNED_SEQUENCE_NUMBER {
210 if manifest.added_snapshot_id != self.snapshot_id {
211 return Err(Error::new(
212 ErrorKind::DataInvalid,
213 format!(
214 "Found unassigned sequence number for a manifest from snapshot {}.",
215 manifest.added_snapshot_id
216 ),
217 ));
218 }
219 manifest.sequence_number = self.sequence_number;
220 }
221
222 if manifest.min_sequence_number == UNASSIGNED_SEQUENCE_NUMBER {
223 if manifest.added_snapshot_id != self.snapshot_id {
224 return Err(Error::new(
225 ErrorKind::DataInvalid,
226 format!(
227 "Found unassigned sequence number for a manifest from snapshot {}.",
228 manifest.added_snapshot_id
229 ),
230 ));
231 }
232 manifest.min_sequence_number = self.sequence_number;
233 }
234
235 Ok(())
236 }
237
238 fn assign_first_row_id(&mut self, manifest: &mut ManifestFile) -> Result<()> {
240 match manifest.content {
241 ManifestContentType::Data => {
242 match (self.next_row_id, manifest.first_row_id) {
243 (Some(_), Some(_)) => {
244 }
247 (None, Some(manifest_first_row_id)) => {
248 return Err(Error::new(
250 ErrorKind::Unexpected,
251 format!(
252 "Found invalid first-row-id assignment for Manifest {}. Writer does not have a next-row-id assigned, but the manifest has first-row-id assigned to {}.",
253 manifest.manifest_path, manifest_first_row_id,
254 ),
255 ));
256 }
257 (Some(writer_next_row_id), None) => {
258 let (existing_rows_count, added_rows_count) =
261 require_row_counts_in_manifest(manifest)?;
262 manifest.first_row_id = Some(writer_next_row_id);
263
264 self.next_row_id = writer_next_row_id
265 .checked_add(existing_rows_count)
266 .and_then(|sum| sum.checked_add(added_rows_count))
267 .ok_or_else(|| {
268 Error::new(
269 ErrorKind::DataInvalid,
270 format!(
271 "Row ID overflow when computing next row ID for Manifest {}. Next Row ID: {writer_next_row_id}, Existing Rows Count: {existing_rows_count}, Added Rows Count: {added_rows_count}",
272 manifest.manifest_path
273 ),
274 )
275 }).map(Some)?;
276 }
277 (None, None) => {
278 }
280 }
281 }
282 ManifestContentType::Deletes => {
283 manifest.first_row_id = None;
285 }
286 };
287
288 Ok(())
289 }
290}
291
292fn require_row_counts_in_manifest(manifest: &ManifestFile) -> Result<(u64, u64)> {
293 let existing_rows_count = manifest.existing_rows_count.ok_or_else(|| {
294 Error::new(
295 ErrorKind::DataInvalid,
296 format!(
297 "Cannot include a Manifest without existing-rows-count to a table with row lineage enabled. Manifest path: {}",
298 manifest.manifest_path,
299 ),
300 )
301 })?;
302 let added_rows_count = manifest.added_rows_count.ok_or_else(|| {
303 Error::new(
304 ErrorKind::DataInvalid,
305 format!(
306 "Cannot include a Manifest without added-rows-count to a table with row lineage enabled. Manifest path: {}",
307 manifest.manifest_path,
308 ),
309 )
310 })?;
311 Ok((existing_rows_count, added_rows_count))
312}
313
314#[cfg(test)]
315mod test {
316 use std::fs;
317 use std::path::Path;
318
319 use tempfile::TempDir;
320
321 use super::ManifestListWriter;
322 use crate::io::{FileIO, FileWrite};
323 use crate::spec::{
324 Datum, FieldSummary, ManifestContentType, ManifestFile, ManifestList,
325 UNASSIGNED_SEQUENCE_NUMBER,
326 };
327
328 #[tokio::test]
329 async fn test_manifest_list_writer_v1() {
330 let expected_manifest_list = ManifestList {
331 entries: vec![ManifestFile {
332 manifest_path: "/opt/bitnami/spark/warehouse/db/table/metadata/10d28031-9739-484c-92db-cdf2975cead4-m0.avro".to_string(),
333 manifest_length: 5806,
334 partition_spec_id: 1,
335 content: ManifestContentType::Data,
336 sequence_number: 0,
337 min_sequence_number: 0,
338 added_snapshot_id: 1646658105718557341,
339 added_files_count: Some(3),
340 existing_files_count: Some(0),
341 deleted_files_count: Some(0),
342 added_rows_count: Some(3),
343 existing_rows_count: Some(0),
344 deleted_rows_count: Some(0),
345 partitions: Some(
346 vec![FieldSummary { contains_null: false, contains_nan: Some(false), lower_bound: Some(Datum::long(1).to_bytes().unwrap()), upper_bound: Some(Datum::long(1).to_bytes().unwrap())}],
347 ),
348 key_metadata: None,
349 first_row_id: None,
350 }]
351 };
352
353 let temp_dir = TempDir::new().unwrap();
354 let path = temp_dir.path().join("manifest_list_v1.avro");
355 let io = FileIO::new_with_fs();
356 let file_writer = file_writer(&path, io).await;
357
358 let mut writer = ManifestListWriter::v1(file_writer, 1646658105718557341, Some(0));
359 writer
360 .add_manifests(expected_manifest_list.entries.clone().into_iter())
361 .unwrap();
362 writer.close().await.unwrap();
363
364 let bs = fs::read(path).unwrap();
365
366 let manifest_list =
367 ManifestList::parse_with_version(&bs, crate::spec::FormatVersion::V1).unwrap();
368 assert_eq!(manifest_list, expected_manifest_list);
369
370 temp_dir.close().unwrap();
371 }
372
373 #[tokio::test]
374 async fn test_manifest_list_writer_v2() {
375 let snapshot_id = 377075049360453639;
376 let seq_num = 1;
377 let mut expected_manifest_list = ManifestList {
378 entries: vec![ManifestFile {
379 manifest_path: "s3a://icebergdata/demo/s1/t1/metadata/05ffe08b-810f-49b3-a8f4-e88fc99b254a-m0.avro".to_string(),
380 manifest_length: 6926,
381 partition_spec_id: 1,
382 content: ManifestContentType::Data,
383 sequence_number: UNASSIGNED_SEQUENCE_NUMBER,
384 min_sequence_number: UNASSIGNED_SEQUENCE_NUMBER,
385 added_snapshot_id: snapshot_id,
386 added_files_count: Some(1),
387 existing_files_count: Some(0),
388 deleted_files_count: Some(0),
389 added_rows_count: Some(3),
390 existing_rows_count: Some(0),
391 deleted_rows_count: Some(0),
392 partitions: Some(
393 vec![FieldSummary { contains_null: false, contains_nan: Some(false), lower_bound: Some(Datum::long(1).to_bytes().unwrap()), upper_bound: Some(Datum::long(1).to_bytes().unwrap())}]
394 ),
395 key_metadata: None,
396 first_row_id: None,
397 }]
398 };
399
400 let temp_dir = TempDir::new().unwrap();
401 let path = temp_dir.path().join("manifest_list_v2.avro");
402 let io = FileIO::new_with_fs();
403 let file_writer = file_writer(&path, io).await;
404
405 let mut writer = ManifestListWriter::v2(file_writer, snapshot_id, Some(0), seq_num);
406 writer
407 .add_manifests(expected_manifest_list.entries.clone().into_iter())
408 .unwrap();
409 writer.close().await.unwrap();
410
411 let bs = fs::read(path).unwrap();
412 let manifest_list =
413 ManifestList::parse_with_version(&bs, crate::spec::FormatVersion::V2).unwrap();
414 expected_manifest_list.entries[0].sequence_number = seq_num;
415 expected_manifest_list.entries[0].min_sequence_number = seq_num;
416 assert_eq!(manifest_list, expected_manifest_list);
417
418 temp_dir.close().unwrap();
419 }
420
421 #[tokio::test]
422 async fn test_manifest_list_writer_v3() {
423 let snapshot_id = 377075049360453639;
424 let seq_num = 1;
425 let mut expected_manifest_list = ManifestList {
426 entries: vec![ManifestFile {
427 manifest_path: "s3a://icebergdata/demo/s1/t1/metadata/05ffe08b-810f-49b3-a8f4-e88fc99b254a-m0.avro".to_string(),
428 manifest_length: 6926,
429 partition_spec_id: 1,
430 content: ManifestContentType::Data,
431 sequence_number: UNASSIGNED_SEQUENCE_NUMBER,
432 min_sequence_number: UNASSIGNED_SEQUENCE_NUMBER,
433 added_snapshot_id: snapshot_id,
434 added_files_count: Some(1),
435 existing_files_count: Some(0),
436 deleted_files_count: Some(0),
437 added_rows_count: Some(3),
438 existing_rows_count: Some(0),
439 deleted_rows_count: Some(0),
440 partitions: Some(
441 vec![FieldSummary { contains_null: false, contains_nan: Some(false), lower_bound: Some(Datum::long(1).to_bytes().unwrap()), upper_bound: Some(Datum::long(1).to_bytes().unwrap())}]
442 ),
443 key_metadata: None,
444 first_row_id: Some(10),
445 }]
446 };
447
448 let temp_dir = TempDir::new().unwrap();
449 let path = temp_dir.path().join("manifest_list_v2.avro");
450 let io = FileIO::new_with_fs();
451 let file_writer = file_writer(&path, io).await;
452
453 let mut writer =
454 ManifestListWriter::v3(file_writer, snapshot_id, Some(0), seq_num, Some(10));
455 writer
456 .add_manifests(expected_manifest_list.entries.clone().into_iter())
457 .unwrap();
458 writer.close().await.unwrap();
459
460 let bs = fs::read(path).unwrap();
461 let manifest_list =
462 ManifestList::parse_with_version(&bs, crate::spec::FormatVersion::V3).unwrap();
463 expected_manifest_list.entries[0].sequence_number = seq_num;
464 expected_manifest_list.entries[0].min_sequence_number = seq_num;
465 expected_manifest_list.entries[0].first_row_id = Some(10);
466 assert_eq!(manifest_list, expected_manifest_list);
467
468 temp_dir.close().unwrap();
469 }
470
471 #[tokio::test]
472 async fn test_manifest_list_writer_v1_as_v2() {
473 let expected_manifest_list = ManifestList {
474 entries: vec![ManifestFile {
475 manifest_path: "/opt/bitnami/spark/warehouse/db/table/metadata/10d28031-9739-484c-92db-cdf2975cead4-m0.avro".to_string(),
476 manifest_length: 5806,
477 partition_spec_id: 1,
478 content: ManifestContentType::Data,
479 sequence_number: 0,
480 min_sequence_number: 0,
481 added_snapshot_id: 1646658105718557341,
482 added_files_count: Some(3),
483 existing_files_count: Some(0),
484 deleted_files_count: Some(0),
485 added_rows_count: Some(3),
486 existing_rows_count: Some(0),
487 deleted_rows_count: Some(0),
488 partitions: Some(
489 vec![FieldSummary { contains_null: false, contains_nan: Some(false), lower_bound: Some(Datum::long(1).to_bytes().unwrap()), upper_bound: Some(Datum::long(1).to_bytes().unwrap())}]
490 ),
491 key_metadata: None,
492 first_row_id: None,
493 }]
494 };
495
496 let temp_dir = TempDir::new().unwrap();
497 let path = temp_dir.path().join("manifest_list_v1.avro");
498 let io = FileIO::new_with_fs();
499 let file_writer = file_writer(&path, io).await;
500
501 let mut writer = ManifestListWriter::v1(file_writer, 1646658105718557341, Some(0));
502 writer
503 .add_manifests(expected_manifest_list.entries.clone().into_iter())
504 .unwrap();
505 writer.close().await.unwrap();
506
507 let bs = fs::read(path).unwrap();
508
509 let manifest_list =
510 ManifestList::parse_with_version(&bs, crate::spec::FormatVersion::V2).unwrap();
511 assert_eq!(manifest_list, expected_manifest_list);
512
513 temp_dir.close().unwrap();
514 }
515
516 #[tokio::test]
517 async fn test_manifest_list_writer_v1_as_v3() {
518 let expected_manifest_list = ManifestList {
519 entries: vec![ManifestFile {
520 manifest_path: "/opt/bitnami/spark/warehouse/db/table/metadata/10d28031-9739-484c-92db-cdf2975cead4-m0.avro".to_string(),
521 manifest_length: 5806,
522 partition_spec_id: 1,
523 content: ManifestContentType::Data,
524 sequence_number: 0,
525 min_sequence_number: 0,
526 added_snapshot_id: 1646658105718557341,
527 added_files_count: Some(3),
528 existing_files_count: Some(0),
529 deleted_files_count: Some(0),
530 added_rows_count: Some(3),
531 existing_rows_count: Some(0),
532 deleted_rows_count: Some(0),
533 partitions: Some(
534 vec![FieldSummary { contains_null: false, contains_nan: Some(false), lower_bound: Some(Datum::long(1).to_bytes().unwrap()), upper_bound: Some(Datum::long(1).to_bytes().unwrap())}]
535 ),
536 key_metadata: None,
537 first_row_id: None,
538 }]
539 };
540
541 let temp_dir = TempDir::new().unwrap();
542 let path = temp_dir.path().join("manifest_list_v1.avro");
543 let io = FileIO::new_with_fs();
544 let file_writer = file_writer(&path, io).await;
545
546 let mut writer = ManifestListWriter::v1(file_writer, 1646658105718557341, Some(0));
547 writer
548 .add_manifests(expected_manifest_list.entries.clone().into_iter())
549 .unwrap();
550 writer.close().await.unwrap();
551
552 let bs = fs::read(path).unwrap();
553
554 let manifest_list =
555 ManifestList::parse_with_version(&bs, crate::spec::FormatVersion::V3).unwrap();
556 assert_eq!(manifest_list, expected_manifest_list);
557
558 temp_dir.close().unwrap();
559 }
560
561 #[tokio::test]
562 async fn test_manifest_list_writer_v2_as_v3() {
563 let snapshot_id = 377075049360453639;
564 let seq_num = 1;
565 let mut expected_manifest_list = ManifestList {
566 entries: vec![ManifestFile {
567 manifest_path: "s3a://icebergdata/demo/s1/t1/metadata/05ffe08b-810f-49b3-a8f4-e88fc99b254a-m0.avro".to_string(),
568 manifest_length: 6926,
569 partition_spec_id: 1,
570 content: ManifestContentType::Data,
571 sequence_number: UNASSIGNED_SEQUENCE_NUMBER,
572 min_sequence_number: UNASSIGNED_SEQUENCE_NUMBER,
573 added_snapshot_id: snapshot_id,
574 added_files_count: Some(1),
575 existing_files_count: Some(0),
576 deleted_files_count: Some(0),
577 added_rows_count: Some(3),
578 existing_rows_count: Some(0),
579 deleted_rows_count: Some(0),
580 partitions: Some(
581 vec![FieldSummary { contains_null: false, contains_nan: Some(false), lower_bound: Some(Datum::long(1).to_bytes().unwrap()), upper_bound: Some(Datum::long(1).to_bytes().unwrap())}]
582 ),
583 key_metadata: None,
584 first_row_id: None,
585 }]
586 };
587
588 let temp_dir = TempDir::new().unwrap();
589 let path = temp_dir.path().join("manifest_list_v2.avro");
590 let io = FileIO::new_with_fs();
591 let file_writer = file_writer(&path, io).await;
592
593 let mut writer = ManifestListWriter::v2(file_writer, snapshot_id, Some(0), seq_num);
594 writer
595 .add_manifests(expected_manifest_list.entries.clone().into_iter())
596 .unwrap();
597 writer.close().await.unwrap();
598
599 let bs = fs::read(path).unwrap();
600
601 let manifest_list =
602 ManifestList::parse_with_version(&bs, crate::spec::FormatVersion::V3).unwrap();
603 expected_manifest_list.entries[0].sequence_number = seq_num;
604 expected_manifest_list.entries[0].min_sequence_number = seq_num;
605 assert_eq!(manifest_list, expected_manifest_list);
606
607 temp_dir.close().unwrap();
608 }
609
610 async fn file_writer(path: &Path, io: FileIO) -> Box<dyn FileWrite> {
611 io.new_output(path.to_str().unwrap())
612 .unwrap()
613 .writer()
614 .await
615 .unwrap()
616 }
617}