1mod _serde;
19
20mod data_file;
21pub use data_file::*;
22mod entry;
23pub use entry::*;
24mod metadata;
25pub use metadata::*;
26mod writer;
27use std::sync::Arc;
28
29use apache_avro::{Reader as AvroReader, from_value};
30pub use writer::*;
31
32use super::{
33 Datum, FormatVersion, ManifestContentType, PartitionSpec, PrimitiveType, Schema, Struct,
34 UNASSIGNED_SEQUENCE_NUMBER,
35};
36use crate::error::Result;
37use crate::{Error, ErrorKind};
38
39#[derive(Debug, PartialEq, Eq, Clone)]
41pub struct Manifest {
42 metadata: ManifestMetadata,
43 entries: Vec<ManifestEntryRef>,
44}
45
46impl Manifest {
47 pub(crate) fn try_from_avro_bytes(bs: &[u8]) -> Result<(ManifestMetadata, Vec<ManifestEntry>)> {
49 let reader = AvroReader::new(bs)?;
50
51 let meta = reader.user_metadata();
53 let metadata = ManifestMetadata::parse(meta)?;
54
55 let partition_type = metadata.partition_spec.partition_type(&metadata.schema)?;
57
58 let entries = match metadata.format_version {
59 FormatVersion::V1 => {
60 let schema = manifest_schema_v1(&partition_type)?;
61 let reader = AvroReader::with_schema(&schema, bs)?;
62 reader
63 .into_iter()
64 .map(|value| {
65 from_value::<_serde::ManifestEntryV1>(&value?)?.try_into(
66 metadata.partition_spec.spec_id(),
67 &partition_type,
68 &metadata.schema,
69 )
70 })
71 .collect::<Result<Vec<_>>>()?
72 }
73 FormatVersion::V2 | FormatVersion::V3 => {
75 let schema = manifest_schema_v2(&partition_type)?;
76 let reader = AvroReader::with_schema(&schema, bs)?;
77 reader
78 .into_iter()
79 .map(|value| {
80 from_value::<_serde::ManifestEntryV2>(&value?)?.try_into(
81 metadata.partition_spec.spec_id(),
82 &partition_type,
83 &metadata.schema,
84 )
85 })
86 .collect::<Result<Vec<_>>>()?
87 }
88 };
89
90 Ok((metadata, entries))
91 }
92
93 pub fn parse_avro(bs: &[u8]) -> Result<Self> {
95 let (metadata, entries) = Self::try_from_avro_bytes(bs)?;
96 Ok(Self::new(metadata, entries))
97 }
98
99 pub fn entries(&self) -> &[ManifestEntryRef] {
101 &self.entries
102 }
103
104 pub fn metadata(&self) -> &ManifestMetadata {
106 &self.metadata
107 }
108
109 pub fn into_parts(self) -> (Vec<ManifestEntryRef>, ManifestMetadata) {
111 let Self { entries, metadata } = self;
112 (entries, metadata)
113 }
114
115 pub fn new(metadata: ManifestMetadata, entries: Vec<ManifestEntry>) -> Self {
117 Self {
118 metadata,
119 entries: entries.into_iter().map(Arc::new).collect(),
120 }
121 }
122}
123
124pub fn serialize_data_file_to_json(
126 data_file: DataFile,
127 partition_type: &super::StructType,
128 format_version: FormatVersion,
129) -> Result<String> {
130 let serde = _serde::DataFileSerde::try_from(data_file, partition_type, format_version)?;
131 serde_json::to_string(&serde).map_err(|e| {
132 Error::new(
133 ErrorKind::DataInvalid,
134 "Failed to serialize DataFile to JSON!".to_string(),
135 )
136 .with_source(e)
137 })
138}
139
140pub fn deserialize_data_file_from_json(
142 json: &str,
143 partition_spec_id: i32,
144 partition_type: &super::StructType,
145 schema: &Schema,
146) -> Result<DataFile> {
147 let serde = serde_json::from_str::<_serde::DataFileSerde>(json).map_err(|e| {
148 Error::new(
149 ErrorKind::DataInvalid,
150 "Failed to deserialize JSON to DataFile!".to_string(),
151 )
152 .with_source(e)
153 })?;
154
155 serde.try_into(partition_spec_id, partition_type, schema)
156}
157
158#[cfg(test)]
159mod tests {
160 use std::collections::HashMap;
161 use std::fs;
162 use std::sync::Arc;
163
164 use serde_json::Value;
165 use tempfile::TempDir;
166
167 use super::*;
168 use crate::io::FileIOBuilder;
169 use crate::spec::{Literal, NestedField, PrimitiveType, Struct, Transform, Type};
170
171 #[tokio::test]
172 async fn test_parse_manifest_v2_unpartition() {
173 let schema = Arc::new(
174 Schema::builder()
175 .with_fields(vec![
176 Arc::new(NestedField::optional(
178 1,
179 "id",
180 Type::Primitive(PrimitiveType::Long),
181 )),
182 Arc::new(NestedField::optional(
183 2,
184 "v_int",
185 Type::Primitive(PrimitiveType::Int),
186 )),
187 Arc::new(NestedField::optional(
188 3,
189 "v_long",
190 Type::Primitive(PrimitiveType::Long),
191 )),
192 Arc::new(NestedField::optional(
193 4,
194 "v_float",
195 Type::Primitive(PrimitiveType::Float),
196 )),
197 Arc::new(NestedField::optional(
198 5,
199 "v_double",
200 Type::Primitive(PrimitiveType::Double),
201 )),
202 Arc::new(NestedField::optional(
203 6,
204 "v_varchar",
205 Type::Primitive(PrimitiveType::String),
206 )),
207 Arc::new(NestedField::optional(
208 7,
209 "v_bool",
210 Type::Primitive(PrimitiveType::Boolean),
211 )),
212 Arc::new(NestedField::optional(
213 8,
214 "v_date",
215 Type::Primitive(PrimitiveType::Date),
216 )),
217 Arc::new(NestedField::optional(
218 9,
219 "v_timestamp",
220 Type::Primitive(PrimitiveType::Timestamptz),
221 )),
222 Arc::new(NestedField::optional(
223 10,
224 "v_decimal",
225 Type::Primitive(PrimitiveType::Decimal {
226 precision: 36,
227 scale: 10,
228 }),
229 )),
230 Arc::new(NestedField::optional(
231 11,
232 "v_ts_ntz",
233 Type::Primitive(PrimitiveType::Timestamp),
234 )),
235 Arc::new(NestedField::optional(
236 12,
237 "v_ts_ns_ntz",
238 Type::Primitive(PrimitiveType::TimestampNs),
239 )),
240 ])
241 .build()
242 .unwrap(),
243 );
244 let metadata = ManifestMetadata {
245 schema_id: 0,
246 schema: schema.clone(),
247 partition_spec: PartitionSpec::builder(schema)
248 .with_spec_id(0)
249 .build()
250 .unwrap(),
251 content: ManifestContentType::Data,
252 format_version: FormatVersion::V2,
253 };
254 let mut entries = vec![
255 ManifestEntry {
256 status: ManifestStatus::Added,
257 snapshot_id: None,
258 sequence_number: None,
259 file_sequence_number: None,
260 data_file: DataFile {content:DataContentType::Data,file_path:"s3a://icebergdata/demo/s1/t1/data/00000-0-ba56fbfa-f2ff-40c9-bb27-565ad6dc2be8-00000.parquet".to_string(),file_format:DataFileFormat::Parquet,partition:Struct::empty(),record_count:1,file_size_in_bytes:5442,column_sizes:HashMap::from([(0,73),(6,34),(2,73),(7,61),(3,61),(5,62),(9,79),(10,73),(1,61),(4,73),(8,73)]),value_counts:HashMap::from([(4,1),(5,1),(2,1),(0,1),(3,1),(6,1),(8,1),(1,1),(10,1),(7,1),(9,1)]),null_value_counts:HashMap::from([(1,0),(6,0),(2,0),(8,0),(0,0),(3,0),(5,0),(9,0),(7,0),(4,0),(10,0)]),nan_value_counts:HashMap::new(),lower_bounds:HashMap::new(),upper_bounds:HashMap::new(),key_metadata:None,split_offsets:vec![4],equality_ids:Some(Vec::new()),sort_order_id:None, partition_spec_id: 0,first_row_id: None,referenced_data_file: None,content_offset: None,content_size_in_bytes: None }
261 }
262 ];
263
264 let tmp_dir = TempDir::new().unwrap();
266 let path = tmp_dir.path().join("test_manifest.avro");
267 let io = FileIOBuilder::new_fs_io().build().unwrap();
268 let output_file = io.new_output(path.to_str().unwrap()).unwrap();
269 let mut writer = ManifestWriterBuilder::new(
270 output_file,
271 Some(1),
272 None,
273 metadata.schema.clone(),
274 metadata.partition_spec.clone(),
275 )
276 .build_v2_data();
277 for entry in &entries {
278 writer.add_entry(entry.clone()).unwrap();
279 }
280 writer.write_manifest_file().await.unwrap();
281
282 let actual_manifest =
284 Manifest::parse_avro(fs::read(path).expect("read_file must succeed").as_slice())
285 .unwrap();
286 entries[0].snapshot_id = Some(1);
288 assert_eq!(actual_manifest, Manifest::new(metadata, entries));
289 }
290
291 #[tokio::test]
292 async fn test_parse_manifest_v2_partition() {
293 let schema = Arc::new(
294 Schema::builder()
295 .with_fields(vec![
296 Arc::new(NestedField::optional(
297 1,
298 "id",
299 Type::Primitive(PrimitiveType::Long),
300 )),
301 Arc::new(NestedField::optional(
302 2,
303 "v_int",
304 Type::Primitive(PrimitiveType::Int),
305 )),
306 Arc::new(NestedField::optional(
307 3,
308 "v_long",
309 Type::Primitive(PrimitiveType::Long),
310 )),
311 Arc::new(NestedField::optional(
312 4,
313 "v_float",
314 Type::Primitive(PrimitiveType::Float),
315 )),
316 Arc::new(NestedField::optional(
317 5,
318 "v_double",
319 Type::Primitive(PrimitiveType::Double),
320 )),
321 Arc::new(NestedField::optional(
322 6,
323 "v_varchar",
324 Type::Primitive(PrimitiveType::String),
325 )),
326 Arc::new(NestedField::optional(
327 7,
328 "v_bool",
329 Type::Primitive(PrimitiveType::Boolean),
330 )),
331 Arc::new(NestedField::optional(
332 8,
333 "v_date",
334 Type::Primitive(PrimitiveType::Date),
335 )),
336 Arc::new(NestedField::optional(
337 9,
338 "v_timestamp",
339 Type::Primitive(PrimitiveType::Timestamptz),
340 )),
341 Arc::new(NestedField::optional(
342 10,
343 "v_decimal",
344 Type::Primitive(PrimitiveType::Decimal {
345 precision: 36,
346 scale: 10,
347 }),
348 )),
349 Arc::new(NestedField::optional(
350 11,
351 "v_ts_ntz",
352 Type::Primitive(PrimitiveType::Timestamp),
353 )),
354 Arc::new(NestedField::optional(
355 12,
356 "v_ts_ns_ntz",
357 Type::Primitive(PrimitiveType::TimestampNs),
358 )),
359 ])
360 .build()
361 .unwrap(),
362 );
363 let metadata = ManifestMetadata {
364 schema_id: 0,
365 schema: schema.clone(),
366 partition_spec: PartitionSpec::builder(schema)
367 .with_spec_id(0)
368 .add_partition_field("v_int", "v_int", Transform::Identity)
369 .unwrap()
370 .add_partition_field("v_long", "v_long", Transform::Identity)
371 .unwrap()
372 .build()
373 .unwrap(),
374 content: ManifestContentType::Data,
375 format_version: FormatVersion::V2,
376 };
377 let mut entries = vec![ManifestEntry {
378 status: ManifestStatus::Added,
379 snapshot_id: None,
380 sequence_number: None,
381 file_sequence_number: None,
382 data_file: DataFile {
383 content: DataContentType::Data,
384 file_format: DataFileFormat::Parquet,
385 file_path: "s3a://icebergdata/demo/s1/t1/data/00000-0-378b56f5-5c52-4102-a2c2-f05f8a7cbe4a-00000.parquet".to_string(),
386 partition: Struct::from_iter(
387 vec![
388 Some(Literal::int(1)),
389 Some(Literal::long(1000)),
390 ]
391 .into_iter()
392 ),
393 record_count: 1,
394 file_size_in_bytes: 5442,
395 column_sizes: HashMap::from([
396 (0, 73),
397 (6, 34),
398 (2, 73),
399 (7, 61),
400 (3, 61),
401 (5, 62),
402 (9, 79),
403 (10, 73),
404 (1, 61),
405 (4, 73),
406 (8, 73)
407 ]),
408 value_counts: HashMap::from([
409 (4, 1),
410 (5, 1),
411 (2, 1),
412 (0, 1),
413 (3, 1),
414 (6, 1),
415 (8, 1),
416 (1, 1),
417 (10, 1),
418 (7, 1),
419 (9, 1)
420 ]),
421 null_value_counts: HashMap::from([
422 (1, 0),
423 (6, 0),
424 (2, 0),
425 (8, 0),
426 (0, 0),
427 (3, 0),
428 (5, 0),
429 (9, 0),
430 (7, 0),
431 (4, 0),
432 (10, 0)
433 ]),
434 nan_value_counts: HashMap::new(),
435 lower_bounds: HashMap::new(),
436 upper_bounds: HashMap::new(),
437 key_metadata: None,
438 split_offsets: vec![4],
439 equality_ids: Some(Vec::new()),
440 sort_order_id: None,
441 partition_spec_id: 0,
442 first_row_id: None,
443 referenced_data_file: None,
444 content_offset: None,
445 content_size_in_bytes: None,
446 },
447 }];
448
449 let tmp_dir = TempDir::new().unwrap();
451 let path = tmp_dir.path().join("test_manifest.avro");
452 let io = FileIOBuilder::new_fs_io().build().unwrap();
453 let output_file = io.new_output(path.to_str().unwrap()).unwrap();
454 let mut writer = ManifestWriterBuilder::new(
455 output_file,
456 Some(2),
457 None,
458 metadata.schema.clone(),
459 metadata.partition_spec.clone(),
460 )
461 .build_v2_data();
462 for entry in &entries {
463 writer.add_entry(entry.clone()).unwrap();
464 }
465 let manifest_file = writer.write_manifest_file().await.unwrap();
466 assert_eq!(manifest_file.sequence_number, UNASSIGNED_SEQUENCE_NUMBER);
467 assert_eq!(
468 manifest_file.min_sequence_number,
469 UNASSIGNED_SEQUENCE_NUMBER
470 );
471
472 let actual_manifest =
474 Manifest::parse_avro(fs::read(path).expect("read_file must succeed").as_slice())
475 .unwrap();
476 entries[0].snapshot_id = Some(2);
478 assert_eq!(actual_manifest, Manifest::new(metadata, entries));
479 }
480
481 #[tokio::test]
482 async fn test_parse_manifest_v1_unpartition() {
483 let schema = Arc::new(
484 Schema::builder()
485 .with_schema_id(1)
486 .with_fields(vec![
487 Arc::new(NestedField::optional(
488 1,
489 "id",
490 Type::Primitive(PrimitiveType::Int),
491 )),
492 Arc::new(NestedField::optional(
493 2,
494 "data",
495 Type::Primitive(PrimitiveType::String),
496 )),
497 Arc::new(NestedField::optional(
498 3,
499 "comment",
500 Type::Primitive(PrimitiveType::String),
501 )),
502 ])
503 .build()
504 .unwrap(),
505 );
506 let metadata = ManifestMetadata {
507 schema_id: 1,
508 schema: schema.clone(),
509 partition_spec: PartitionSpec::builder(schema)
510 .with_spec_id(0)
511 .build()
512 .unwrap(),
513 content: ManifestContentType::Data,
514 format_version: FormatVersion::V1,
515 };
516 let mut entries = vec![ManifestEntry {
517 status: ManifestStatus::Added,
518 snapshot_id: Some(0),
519 sequence_number: Some(0),
520 file_sequence_number: Some(0),
521 data_file: DataFile {
522 content: DataContentType::Data,
523 file_path: "s3://testbucket/iceberg_data/iceberg_ctl/iceberg_db/iceberg_tbl/data/00000-7-45268d71-54eb-476c-b42c-942d880c04a1-00001.parquet".to_string(),
524 file_format: DataFileFormat::Parquet,
525 partition: Struct::empty(),
526 record_count: 1,
527 file_size_in_bytes: 875,
528 column_sizes: HashMap::from([(1,47),(2,48),(3,52)]),
529 value_counts: HashMap::from([(1,1),(2,1),(3,1)]),
530 null_value_counts: HashMap::from([(1,0),(2,0),(3,0)]),
531 nan_value_counts: HashMap::new(),
532 lower_bounds: HashMap::from([(1,Datum::int(1)),(2,Datum::string("a")),(3,Datum::string("AC/DC"))]),
533 upper_bounds: HashMap::from([(1,Datum::int(1)),(2,Datum::string("a")),(3,Datum::string("AC/DC"))]),
534 key_metadata: None,
535 split_offsets: vec![4],
536 equality_ids: None,
537 sort_order_id: Some(0),
538 partition_spec_id: 0,
539 first_row_id: None,
540 referenced_data_file: None,
541 content_offset: None,
542 content_size_in_bytes: None,
543 }
544 }];
545
546 let tmp_dir = TempDir::new().unwrap();
548 let path = tmp_dir.path().join("test_manifest.avro");
549 let io = FileIOBuilder::new_fs_io().build().unwrap();
550 let output_file = io.new_output(path.to_str().unwrap()).unwrap();
551 let mut writer = ManifestWriterBuilder::new(
552 output_file,
553 Some(3),
554 None,
555 metadata.schema.clone(),
556 metadata.partition_spec.clone(),
557 )
558 .build_v1();
559 for entry in &entries {
560 writer.add_entry(entry.clone()).unwrap();
561 }
562 writer.write_manifest_file().await.unwrap();
563
564 let actual_manifest =
566 Manifest::parse_avro(fs::read(path).expect("read_file must succeed").as_slice())
567 .unwrap();
568 entries[0].snapshot_id = Some(3);
570 assert_eq!(actual_manifest, Manifest::new(metadata, entries));
571 }
572
573 #[tokio::test]
574 async fn test_parse_manifest_v1_partition() {
575 let schema = Arc::new(
576 Schema::builder()
577 .with_fields(vec![
578 Arc::new(NestedField::optional(
579 1,
580 "id",
581 Type::Primitive(PrimitiveType::Long),
582 )),
583 Arc::new(NestedField::optional(
584 2,
585 "data",
586 Type::Primitive(PrimitiveType::String),
587 )),
588 Arc::new(NestedField::optional(
589 3,
590 "category",
591 Type::Primitive(PrimitiveType::String),
592 )),
593 ])
594 .build()
595 .unwrap(),
596 );
597 let metadata = ManifestMetadata {
598 schema_id: 0,
599 schema: schema.clone(),
600 partition_spec: PartitionSpec::builder(schema)
601 .add_partition_field("category", "category", Transform::Identity)
602 .unwrap()
603 .build()
604 .unwrap(),
605 content: ManifestContentType::Data,
606 format_version: FormatVersion::V1,
607 };
608 let mut entries = vec![
609 ManifestEntry {
610 status: ManifestStatus::Added,
611 snapshot_id: Some(0),
612 sequence_number: Some(0),
613 file_sequence_number: Some(0),
614 data_file: DataFile {
615 content: DataContentType::Data,
616 file_path: "s3://testbucket/prod/db/sample/data/category=x/00010-1-d5c93668-1e52-41ac-92a6-bba590cbf249-00001.parquet".to_string(),
617 file_format: DataFileFormat::Parquet,
618 partition: Struct::from_iter(
619 vec![
620 Some(
621 Literal::string("x"),
622 ),
623 ]
624 .into_iter()
625 ),
626 record_count: 1,
627 file_size_in_bytes: 874,
628 column_sizes: HashMap::from([(1, 46), (2, 48), (3, 48)]),
629 value_counts: HashMap::from([(1, 1), (2, 1), (3, 1)]),
630 null_value_counts: HashMap::from([(1, 0), (2, 0), (3, 0)]),
631 nan_value_counts: HashMap::new(),
632 lower_bounds: HashMap::from([
633 (1, Datum::long(1)),
634 (2, Datum::string("a")),
635 (3, Datum::string("x"))
636 ]),
637 upper_bounds: HashMap::from([
638 (1, Datum::long(1)),
639 (2, Datum::string("a")),
640 (3, Datum::string("x"))
641 ]),
642 key_metadata: None,
643 split_offsets: vec![4],
644 equality_ids: None,
645 sort_order_id: Some(0),
646 partition_spec_id: 0,
647 first_row_id: None,
648 referenced_data_file: None,
649 content_offset: None,
650 content_size_in_bytes: None,
651 },
652 }
653 ];
654
655 let tmp_dir = TempDir::new().unwrap();
657 let path = tmp_dir.path().join("test_manifest.avro");
658 let io = FileIOBuilder::new_fs_io().build().unwrap();
659 let output_file = io.new_output(path.to_str().unwrap()).unwrap();
660 let mut writer = ManifestWriterBuilder::new(
661 output_file,
662 Some(2),
663 None,
664 metadata.schema.clone(),
665 metadata.partition_spec.clone(),
666 )
667 .build_v1();
668 for entry in &entries {
669 writer.add_entry(entry.clone()).unwrap();
670 }
671 let manifest_file = writer.write_manifest_file().await.unwrap();
672 let partitions = manifest_file.partitions.unwrap();
673 assert_eq!(partitions.len(), 1);
674 assert_eq!(
675 partitions[0].clone().lower_bound.unwrap(),
676 Datum::string("x").to_bytes().unwrap()
677 );
678 assert_eq!(
679 partitions[0].clone().upper_bound.unwrap(),
680 Datum::string("x").to_bytes().unwrap()
681 );
682
683 let actual_manifest =
685 Manifest::parse_avro(fs::read(path).expect("read_file must succeed").as_slice())
686 .unwrap();
687 entries[0].snapshot_id = Some(2);
689 assert_eq!(actual_manifest, Manifest::new(metadata, entries));
690 }
691
692 #[tokio::test]
693 async fn test_parse_manifest_with_schema_evolution() {
694 let schema = Arc::new(
695 Schema::builder()
696 .with_fields(vec![
697 Arc::new(NestedField::optional(
698 1,
699 "id",
700 Type::Primitive(PrimitiveType::Long),
701 )),
702 Arc::new(NestedField::optional(
703 2,
704 "v_int",
705 Type::Primitive(PrimitiveType::Int),
706 )),
707 ])
708 .build()
709 .unwrap(),
710 );
711 let metadata = ManifestMetadata {
712 schema_id: 0,
713 schema: schema.clone(),
714 partition_spec: PartitionSpec::builder(schema)
715 .with_spec_id(0)
716 .build()
717 .unwrap(),
718 content: ManifestContentType::Data,
719 format_version: FormatVersion::V2,
720 };
721 let entries = vec![ManifestEntry {
722 status: ManifestStatus::Added,
723 snapshot_id: None,
724 sequence_number: None,
725 file_sequence_number: None,
726 data_file: DataFile {
727 content: DataContentType::Data,
728 file_format: DataFileFormat::Parquet,
729 file_path: "s3a://icebergdata/demo/s1/t1/data/00000-0-378b56f5-5c52-4102-a2c2-f05f8a7cbe4a-00000.parquet".to_string(),
730 partition: Struct::empty(),
731 record_count: 1,
732 file_size_in_bytes: 5442,
733 column_sizes: HashMap::from([
734 (1, 61),
735 (2, 73),
736 (3, 61),
737 ]),
738 value_counts: HashMap::default(),
739 null_value_counts: HashMap::default(),
740 nan_value_counts: HashMap::new(),
741 lower_bounds: HashMap::from([
742 (1, Datum::long(1)),
743 (2, Datum::int(2)),
744 (3, Datum::string("x"))
745 ]),
746 upper_bounds: HashMap::from([
747 (1, Datum::long(1)),
748 (2, Datum::int(2)),
749 (3, Datum::string("x"))
750 ]),
751 key_metadata: None,
752 split_offsets: vec![4],
753 equality_ids: None,
754 sort_order_id: None,
755 partition_spec_id: 0,
756 first_row_id: None,
757 referenced_data_file: None,
758 content_offset: None,
759 content_size_in_bytes: None,
760 },
761 }];
762
763 let tmp_dir = TempDir::new().unwrap();
765 let path = tmp_dir.path().join("test_manifest.avro");
766 let io = FileIOBuilder::new_fs_io().build().unwrap();
767 let output_file = io.new_output(path.to_str().unwrap()).unwrap();
768 let mut writer = ManifestWriterBuilder::new(
769 output_file,
770 Some(2),
771 None,
772 metadata.schema.clone(),
773 metadata.partition_spec.clone(),
774 )
775 .build_v2_data();
776 for entry in &entries {
777 writer.add_entry(entry.clone()).unwrap();
778 }
779 writer.write_manifest_file().await.unwrap();
780
781 let actual_manifest =
783 Manifest::parse_avro(fs::read(path).expect("read_file must succeed").as_slice())
784 .unwrap();
785
786 let schema = Arc::new(
790 Schema::builder()
791 .with_fields(vec![
792 Arc::new(NestedField::optional(
793 1,
794 "id",
795 Type::Primitive(PrimitiveType::Long),
796 )),
797 Arc::new(NestedField::optional(
798 2,
799 "v_int",
800 Type::Primitive(PrimitiveType::Int),
801 )),
802 ])
803 .build()
804 .unwrap(),
805 );
806 let expected_manifest = Manifest {
807 metadata: ManifestMetadata {
808 schema_id: 0,
809 schema: schema.clone(),
810 partition_spec: PartitionSpec::builder(schema).with_spec_id(0).build().unwrap(),
811 content: ManifestContentType::Data,
812 format_version: FormatVersion::V2,
813 },
814 entries: vec![Arc::new(ManifestEntry {
815 status: ManifestStatus::Added,
816 snapshot_id: Some(2),
817 sequence_number: None,
818 file_sequence_number: None,
819 data_file: DataFile {
820 content: DataContentType::Data,
821 file_format: DataFileFormat::Parquet,
822 file_path: "s3a://icebergdata/demo/s1/t1/data/00000-0-378b56f5-5c52-4102-a2c2-f05f8a7cbe4a-00000.parquet".to_string(),
823 partition: Struct::empty(),
824 record_count: 1,
825 file_size_in_bytes: 5442,
826 column_sizes: HashMap::from([
827 (1, 61),
828 (2, 73),
829 (3, 61),
830 ]),
831 value_counts: HashMap::default(),
832 null_value_counts: HashMap::default(),
833 nan_value_counts: HashMap::new(),
834 lower_bounds: HashMap::from([
835 (1, Datum::long(1)),
836 (2, Datum::int(2)),
837 ]),
838 upper_bounds: HashMap::from([
839 (1, Datum::long(1)),
840 (2, Datum::int(2)),
841 ]),
842 key_metadata: None,
843 split_offsets: vec![4],
844 equality_ids: None,
845 sort_order_id: None,
846 partition_spec_id: 0,
847 first_row_id: None,
848 referenced_data_file: None,
849 content_offset: None,
850 content_size_in_bytes: None,
851 },
852 })],
853 };
854
855 assert_eq!(actual_manifest, expected_manifest);
856 }
857
858 #[tokio::test]
859 async fn test_manifest_summary() {
860 let schema = Arc::new(
861 Schema::builder()
862 .with_fields(vec![
863 Arc::new(NestedField::optional(
864 1,
865 "time",
866 Type::Primitive(PrimitiveType::Date),
867 )),
868 Arc::new(NestedField::optional(
869 2,
870 "v_float",
871 Type::Primitive(PrimitiveType::Float),
872 )),
873 Arc::new(NestedField::optional(
874 3,
875 "v_double",
876 Type::Primitive(PrimitiveType::Double),
877 )),
878 ])
879 .build()
880 .unwrap(),
881 );
882 let partition_spec = PartitionSpec::builder(schema.clone())
883 .with_spec_id(0)
884 .add_partition_field("time", "year_of_time", Transform::Year)
885 .unwrap()
886 .add_partition_field("v_float", "f", Transform::Identity)
887 .unwrap()
888 .add_partition_field("v_double", "d", Transform::Identity)
889 .unwrap()
890 .build()
891 .unwrap();
892 let metadata = ManifestMetadata {
893 schema_id: 0,
894 schema,
895 partition_spec,
896 content: ManifestContentType::Data,
897 format_version: FormatVersion::V2,
898 };
899 let entries = vec![
900 ManifestEntry {
901 status: ManifestStatus::Added,
902 snapshot_id: None,
903 sequence_number: None,
904 file_sequence_number: None,
905 data_file: DataFile {
906 content: DataContentType::Data,
907 file_path: "s3a://icebergdata/demo/s1/t1/data/00000-0-ba56fbfa-f2ff-40c9-bb27-565ad6dc2be8-00000.parquet".to_string(),
908 file_format: DataFileFormat::Parquet,
909 partition: Struct::from_iter(
910 vec![
911 Some(Literal::int(2021)),
912 Some(Literal::float(1.0)),
913 Some(Literal::double(2.0)),
914 ]
915 ),
916 record_count: 1,
917 file_size_in_bytes: 5442,
918 column_sizes: HashMap::from([(0,73),(6,34),(2,73),(7,61),(3,61),(5,62),(9,79),(10,73),(1,61),(4,73),(8,73)]),
919 value_counts: HashMap::from([(4,1),(5,1),(2,1),(0,1),(3,1),(6,1),(8,1),(1,1),(10,1),(7,1),(9,1)]),
920 null_value_counts: HashMap::from([(1,0),(6,0),(2,0),(8,0),(0,0),(3,0),(5,0),(9,0),(7,0),(4,0),(10,0)]),
921 nan_value_counts: HashMap::new(),
922 lower_bounds: HashMap::new(),
923 upper_bounds: HashMap::new(),
924 key_metadata: None,
925 split_offsets: vec![4],
926 equality_ids: None,
927 sort_order_id: None,
928 partition_spec_id: 0,
929 first_row_id: None,
930 referenced_data_file: None,
931 content_offset: None,
932 content_size_in_bytes: None,
933 }
934 },
935 ManifestEntry {
936 status: ManifestStatus::Added,
937 snapshot_id: None,
938 sequence_number: None,
939 file_sequence_number: None,
940 data_file: DataFile {
941 content: DataContentType::Data,
942 file_path: "s3a://icebergdata/demo/s1/t1/data/00000-0-ba56fbfa-f2ff-40c9-bb27-565ad6dc2be8-00000.parquet".to_string(),
943 file_format: DataFileFormat::Parquet,
944 partition: Struct::from_iter(
945 vec![
946 Some(Literal::int(1111)),
947 Some(Literal::float(15.5)),
948 Some(Literal::double(25.5)),
949 ]
950 ),
951 record_count: 1,
952 file_size_in_bytes: 5442,
953 column_sizes: HashMap::from([(0,73),(6,34),(2,73),(7,61),(3,61),(5,62),(9,79),(10,73),(1,61),(4,73),(8,73)]),
954 value_counts: HashMap::from([(4,1),(5,1),(2,1),(0,1),(3,1),(6,1),(8,1),(1,1),(10,1),(7,1),(9,1)]),
955 null_value_counts: HashMap::from([(1,0),(6,0),(2,0),(8,0),(0,0),(3,0),(5,0),(9,0),(7,0),(4,0),(10,0)]),
956 nan_value_counts: HashMap::new(),
957 lower_bounds: HashMap::new(),
958 upper_bounds: HashMap::new(),
959 key_metadata: None,
960 split_offsets: vec![4],
961 equality_ids: None,
962 sort_order_id: None,
963 partition_spec_id: 0,
964 first_row_id: None,
965 referenced_data_file: None,
966 content_offset: None,
967 content_size_in_bytes: None,
968 }
969 },
970 ManifestEntry {
971 status: ManifestStatus::Added,
972 snapshot_id: None,
973 sequence_number: None,
974 file_sequence_number: None,
975 data_file: DataFile {
976 content: DataContentType::Data,
977 file_path: "s3a://icebergdata/demo/s1/t1/data/00000-0-ba56fbfa-f2ff-40c9-bb27-565ad6dc2be8-00000.parquet".to_string(),
978 file_format: DataFileFormat::Parquet,
979 partition: Struct::from_iter(
980 vec![
981 Some(Literal::int(1211)),
982 Some(Literal::float(f32::NAN)),
983 Some(Literal::double(1.0)),
984 ]
985 ),
986 record_count: 1,
987 file_size_in_bytes: 5442,
988 column_sizes: HashMap::from([(0,73),(6,34),(2,73),(7,61),(3,61),(5,62),(9,79),(10,73),(1,61),(4,73),(8,73)]),
989 value_counts: HashMap::from([(4,1),(5,1),(2,1),(0,1),(3,1),(6,1),(8,1),(1,1),(10,1),(7,1),(9,1)]),
990 null_value_counts: HashMap::from([(1,0),(6,0),(2,0),(8,0),(0,0),(3,0),(5,0),(9,0),(7,0),(4,0),(10,0)]),
991 nan_value_counts: HashMap::new(),
992 lower_bounds: HashMap::new(),
993 upper_bounds: HashMap::new(),
994 key_metadata: None,
995 split_offsets: vec![4],
996 equality_ids: None,
997 sort_order_id: None,
998 partition_spec_id: 0,
999 first_row_id: None,
1000 referenced_data_file: None,
1001 content_offset: None,
1002 content_size_in_bytes: None,
1003 }
1004 },
1005 ManifestEntry {
1006 status: ManifestStatus::Added,
1007 snapshot_id: None,
1008 sequence_number: None,
1009 file_sequence_number: None,
1010 data_file: DataFile {
1011 content: DataContentType::Data,
1012 file_path: "s3a://icebergdata/demo/s1/t1/data/00000-0-ba56fbfa-f2ff-40c9-bb27-565ad6dc2be8-00000.parquet".to_string(),
1013 file_format: DataFileFormat::Parquet,
1014 partition: Struct::from_iter(
1015 vec![
1016 Some(Literal::int(1111)),
1017 None,
1018 Some(Literal::double(11.0)),
1019 ]
1020 ),
1021 record_count: 1,
1022 file_size_in_bytes: 5442,
1023 column_sizes: HashMap::from([(0,73),(6,34),(2,73),(7,61),(3,61),(5,62),(9,79),(10,73),(1,61),(4,73),(8,73)]),
1024 value_counts: HashMap::from([(4,1),(5,1),(2,1),(0,1),(3,1),(6,1),(8,1),(1,1),(10,1),(7,1),(9,1)]),
1025 null_value_counts: HashMap::from([(1,0),(6,0),(2,0),(8,0),(0,0),(3,0),(5,0),(9,0),(7,0),(4,0),(10,0)]),
1026 nan_value_counts: HashMap::new(),
1027 lower_bounds: HashMap::new(),
1028 upper_bounds: HashMap::new(),
1029 key_metadata: None,
1030 split_offsets: vec![4],
1031 equality_ids: None,
1032 sort_order_id: None,
1033 partition_spec_id: 0,
1034 first_row_id: None,
1035 referenced_data_file: None,
1036 content_offset: None,
1037 content_size_in_bytes: None,
1038 }
1039 },
1040 ];
1041
1042 let tmp_dir = TempDir::new().unwrap();
1044 let path = tmp_dir.path().join("test_manifest.avro");
1045 let io = FileIOBuilder::new_fs_io().build().unwrap();
1046 let output_file = io.new_output(path.to_str().unwrap()).unwrap();
1047 let mut writer = ManifestWriterBuilder::new(
1048 output_file,
1049 Some(1),
1050 None,
1051 metadata.schema.clone(),
1052 metadata.partition_spec.clone(),
1053 )
1054 .build_v2_data();
1055 for entry in &entries {
1056 writer.add_entry(entry.clone()).unwrap();
1057 }
1058 let res = writer.write_manifest_file().await.unwrap();
1059
1060 let partitions = res.partitions.unwrap();
1061
1062 assert_eq!(partitions.len(), 3);
1063 assert_eq!(
1064 partitions[0].clone().lower_bound.unwrap(),
1065 Datum::int(1111).to_bytes().unwrap()
1066 );
1067 assert_eq!(
1068 partitions[0].clone().upper_bound.unwrap(),
1069 Datum::int(2021).to_bytes().unwrap()
1070 );
1071 assert!(!partitions[0].clone().contains_null);
1072 assert_eq!(partitions[0].clone().contains_nan, Some(false));
1073
1074 assert_eq!(
1075 partitions[1].clone().lower_bound.unwrap(),
1076 Datum::float(1.0).to_bytes().unwrap()
1077 );
1078 assert_eq!(
1079 partitions[1].clone().upper_bound.unwrap(),
1080 Datum::float(15.5).to_bytes().unwrap()
1081 );
1082 assert!(partitions[1].clone().contains_null);
1083 assert_eq!(partitions[1].clone().contains_nan, Some(true));
1084
1085 assert_eq!(
1086 partitions[2].clone().lower_bound.unwrap(),
1087 Datum::double(1.0).to_bytes().unwrap()
1088 );
1089 assert_eq!(
1090 partitions[2].clone().upper_bound.unwrap(),
1091 Datum::double(25.5).to_bytes().unwrap()
1092 );
1093 assert!(!partitions[2].clone().contains_null);
1094 assert_eq!(partitions[2].clone().contains_nan, Some(false));
1095 }
1096
1097 #[test]
1098 fn test_data_file_serialization() {
1099 let schema = Schema::builder()
1101 .with_schema_id(1)
1102 .with_identifier_field_ids(vec![1])
1103 .with_fields(vec![
1104 NestedField::required(1, "id", Type::Primitive(PrimitiveType::Long)).into(),
1105 NestedField::required(2, "name", Type::Primitive(PrimitiveType::String)).into(),
1106 ])
1107 .build()
1108 .unwrap();
1109
1110 let partition_spec = PartitionSpec::builder(schema.clone())
1112 .with_spec_id(1)
1113 .add_partition_field("id", "id_partition", Transform::Identity)
1114 .unwrap()
1115 .build()
1116 .unwrap();
1117
1118 let partition_type = partition_spec.partition_type(&schema).unwrap();
1120
1121 let data_files = vec![
1123 DataFileBuilder::default()
1124 .content(DataContentType::Data)
1125 .file_format(DataFileFormat::Parquet)
1126 .file_path("path/to/file1.parquet".to_string())
1127 .file_size_in_bytes(1024)
1128 .record_count(100)
1129 .partition_spec_id(1)
1130 .partition(Struct::empty())
1131 .column_sizes(HashMap::from([(1, 512), (2, 1024)]))
1132 .value_counts(HashMap::from([(1, 100), (2, 500)]))
1133 .null_value_counts(HashMap::from([(1, 0), (2, 1)]))
1134 .build()
1135 .unwrap(),
1136 DataFileBuilder::default()
1137 .content(DataContentType::Data)
1138 .file_format(DataFileFormat::Parquet)
1139 .file_path("path/to/file2.parquet".to_string())
1140 .file_size_in_bytes(2048)
1141 .record_count(200)
1142 .partition_spec_id(1)
1143 .partition(Struct::empty())
1144 .column_sizes(HashMap::from([(1, 1024), (2, 2048)]))
1145 .value_counts(HashMap::from([(1, 200), (2, 600)]))
1146 .null_value_counts(HashMap::from([(1, 10), (2, 999)]))
1147 .build()
1148 .unwrap(),
1149 ];
1150
1151 let serialized_files = data_files
1153 .clone()
1154 .into_iter()
1155 .map(|f| serialize_data_file_to_json(f, &partition_type, FormatVersion::V2).unwrap())
1156 .collect::<Vec<String>>();
1157
1158 assert_eq!(serialized_files.len(), 2);
1160 let pretty_json1: Value = serde_json::from_str(serialized_files.first().unwrap()).unwrap();
1161 let pretty_json2: Value = serde_json::from_str(serialized_files.get(1).unwrap()).unwrap();
1162 let expected_serialized_file1 = serde_json::json!({
1163 "content": 0,
1164 "file_path": "path/to/file1.parquet",
1165 "file_format": "PARQUET",
1166 "partition": {},
1167 "record_count": 100,
1168 "file_size_in_bytes": 1024,
1169 "column_sizes": [
1170 { "key": 1, "value": 512 },
1171 { "key": 2, "value": 1024 }
1172 ],
1173 "value_counts": [
1174 { "key": 1, "value": 100 },
1175 { "key": 2, "value": 500 }
1176 ],
1177 "null_value_counts": [
1178 { "key": 1, "value": 0 },
1179 { "key": 2, "value": 1 }
1180 ],
1181 "nan_value_counts": [],
1182 "lower_bounds": [],
1183 "upper_bounds": [],
1184 "key_metadata": null,
1185 "split_offsets": [],
1186 "equality_ids": null,
1187 "sort_order_id": null,
1188 "first_row_id": null,
1189 "referenced_data_file": null,
1190 "content_offset": null,
1191 "content_size_in_bytes": null
1192 });
1193 let expected_serialized_file2 = serde_json::json!({
1194 "content": 0,
1195 "file_path": "path/to/file2.parquet",
1196 "file_format": "PARQUET",
1197 "partition": {},
1198 "record_count": 200,
1199 "file_size_in_bytes": 2048,
1200 "column_sizes": [
1201 { "key": 1, "value": 1024 },
1202 { "key": 2, "value": 2048 }
1203 ],
1204 "value_counts": [
1205 { "key": 1, "value": 200 },
1206 { "key": 2, "value": 600 }
1207 ],
1208 "null_value_counts": [
1209 { "key": 1, "value": 10 },
1210 { "key": 2, "value": 999 }
1211 ],
1212 "nan_value_counts": [],
1213 "lower_bounds": [],
1214 "upper_bounds": [],
1215 "key_metadata": null,
1216 "split_offsets": [],
1217 "equality_ids": null,
1218 "sort_order_id": null,
1219 "first_row_id": null,
1220 "referenced_data_file": null,
1221 "content_offset": null,
1222 "content_size_in_bytes": null
1223 });
1224 assert_eq!(pretty_json1, expected_serialized_file1);
1225 assert_eq!(pretty_json2, expected_serialized_file2);
1226
1227 let deserialized_files: Vec<DataFile> = serialized_files
1229 .into_iter()
1230 .map(|json| {
1231 deserialize_data_file_from_json(
1232 &json,
1233 partition_spec.spec_id(),
1234 &partition_type,
1235 &schema,
1236 )
1237 .unwrap()
1238 })
1239 .collect();
1240
1241 assert_eq!(deserialized_files.len(), 2);
1243 let deserialized_data_file1 = deserialized_files.first().unwrap();
1244 let deserialized_data_file2 = deserialized_files.get(1).unwrap();
1245 let original_data_file1 = data_files.first().unwrap();
1246 let original_data_file2 = data_files.get(1).unwrap();
1247
1248 assert_eq!(deserialized_data_file1, original_data_file1);
1249 assert_eq!(deserialized_data_file2, original_data_file2);
1250 }
1251}