1mod _serde;
19
20mod data_file;
21pub use data_file::*;
22mod entry;
23pub use entry::*;
24mod metadata;
25pub use metadata::*;
26mod writer;
27use std::sync::Arc;
28
29use apache_avro::{Reader as AvroReader, from_value};
30pub use writer::*;
31
32use super::{
33 Datum, FormatVersion, ManifestContentType, PartitionSpec, PrimitiveType, Schema, Struct,
34 UNASSIGNED_SEQUENCE_NUMBER,
35};
36use crate::error::Result;
37use crate::{Error, ErrorKind};
38
39#[derive(Debug, PartialEq, Eq, Clone)]
41pub struct Manifest {
42 metadata: ManifestMetadata,
43 entries: Vec<ManifestEntryRef>,
44}
45
46impl Manifest {
47 pub(crate) fn try_from_avro_bytes(bs: &[u8]) -> Result<(ManifestMetadata, Vec<ManifestEntry>)> {
49 let reader = AvroReader::new(bs)?;
50
51 let meta = reader.user_metadata();
53 let metadata = ManifestMetadata::parse(meta)?;
54
55 let partition_type = metadata.partition_spec.partition_type(&metadata.schema)?;
57
58 let entries = match metadata.format_version {
59 FormatVersion::V1 => {
60 let schema = manifest_schema_v1(&partition_type)?;
61 let reader = AvroReader::with_schema(&schema, bs)?;
62 reader
63 .into_iter()
64 .map(|value| {
65 from_value::<_serde::ManifestEntryV1>(&value?)?.try_into(
66 metadata.partition_spec.spec_id(),
67 &partition_type,
68 &metadata.schema,
69 )
70 })
71 .collect::<Result<Vec<_>>>()?
72 }
73 FormatVersion::V2 | FormatVersion::V3 => {
75 let schema = manifest_schema_v2(&partition_type)?;
76 let reader = AvroReader::with_schema(&schema, bs)?;
77 reader
78 .into_iter()
79 .map(|value| {
80 from_value::<_serde::ManifestEntryV2>(&value?)?.try_into(
81 metadata.partition_spec.spec_id(),
82 &partition_type,
83 &metadata.schema,
84 )
85 })
86 .collect::<Result<Vec<_>>>()?
87 }
88 };
89
90 Ok((metadata, entries))
91 }
92
93 pub fn parse_avro(bs: &[u8]) -> Result<Self> {
95 let (metadata, entries) = Self::try_from_avro_bytes(bs)?;
96 Ok(Self::new(metadata, entries))
97 }
98
99 pub fn entries(&self) -> &[ManifestEntryRef] {
101 &self.entries
102 }
103
104 pub fn metadata(&self) -> &ManifestMetadata {
106 &self.metadata
107 }
108
109 pub fn into_parts(self) -> (Vec<ManifestEntryRef>, ManifestMetadata) {
111 let Self { entries, metadata } = self;
112 (entries, metadata)
113 }
114
115 pub fn new(metadata: ManifestMetadata, entries: Vec<ManifestEntry>) -> Self {
117 Self {
118 metadata,
119 entries: entries.into_iter().map(Arc::new).collect(),
120 }
121 }
122}
123
124pub fn serialize_data_file_to_json(
126 data_file: DataFile,
127 partition_type: &super::StructType,
128 format_version: FormatVersion,
129) -> Result<String> {
130 let serde = _serde::DataFileSerde::try_from(data_file, partition_type, format_version)?;
131 serde_json::to_string(&serde).map_err(|e| {
132 Error::new(
133 ErrorKind::DataInvalid,
134 "Failed to serialize DataFile to JSON!".to_string(),
135 )
136 .with_source(e)
137 })
138}
139
140pub fn deserialize_data_file_from_json(
142 json: &str,
143 partition_spec_id: i32,
144 partition_type: &super::StructType,
145 schema: &Schema,
146) -> Result<DataFile> {
147 let serde = serde_json::from_str::<_serde::DataFileSerde>(json).map_err(|e| {
148 Error::new(
149 ErrorKind::DataInvalid,
150 "Failed to deserialize JSON to DataFile!".to_string(),
151 )
152 .with_source(e)
153 })?;
154
155 serde.try_into(partition_spec_id, partition_type, schema)
156}
157
158#[cfg(test)]
159mod tests {
160 use std::collections::HashMap;
161 use std::fs;
162 use std::sync::Arc;
163
164 use apache_avro::{Codec, Writer, to_value};
165 use serde_json::{Value, to_vec};
166 use tempfile::TempDir;
167
168 use super::*;
169 use crate::io::FileIO;
170 use crate::spec::{Literal, NestedField, PrimitiveType, Struct, Transform, Type};
171
172 #[tokio::test]
173 async fn test_parse_manifest_v2_unpartition() {
174 let schema = Arc::new(
175 Schema::builder()
176 .with_fields(vec![
177 Arc::new(NestedField::optional(
179 1,
180 "id",
181 Type::Primitive(PrimitiveType::Long),
182 )),
183 Arc::new(NestedField::optional(
184 2,
185 "v_int",
186 Type::Primitive(PrimitiveType::Int),
187 )),
188 Arc::new(NestedField::optional(
189 3,
190 "v_long",
191 Type::Primitive(PrimitiveType::Long),
192 )),
193 Arc::new(NestedField::optional(
194 4,
195 "v_float",
196 Type::Primitive(PrimitiveType::Float),
197 )),
198 Arc::new(NestedField::optional(
199 5,
200 "v_double",
201 Type::Primitive(PrimitiveType::Double),
202 )),
203 Arc::new(NestedField::optional(
204 6,
205 "v_varchar",
206 Type::Primitive(PrimitiveType::String),
207 )),
208 Arc::new(NestedField::optional(
209 7,
210 "v_bool",
211 Type::Primitive(PrimitiveType::Boolean),
212 )),
213 Arc::new(NestedField::optional(
214 8,
215 "v_date",
216 Type::Primitive(PrimitiveType::Date),
217 )),
218 Arc::new(NestedField::optional(
219 9,
220 "v_timestamp",
221 Type::Primitive(PrimitiveType::Timestamptz),
222 )),
223 Arc::new(NestedField::optional(
224 10,
225 "v_decimal",
226 Type::Primitive(PrimitiveType::Decimal {
227 precision: 36,
228 scale: 10,
229 }),
230 )),
231 Arc::new(NestedField::optional(
232 11,
233 "v_ts_ntz",
234 Type::Primitive(PrimitiveType::Timestamp),
235 )),
236 Arc::new(NestedField::optional(
237 12,
238 "v_ts_ns_ntz",
239 Type::Primitive(PrimitiveType::TimestampNs),
240 )),
241 ])
242 .build()
243 .unwrap(),
244 );
245 let metadata = ManifestMetadata {
246 schema_id: 0,
247 schema: schema.clone(),
248 partition_spec: PartitionSpec::builder(schema)
249 .with_spec_id(0)
250 .build()
251 .unwrap(),
252 content: ManifestContentType::Data,
253 format_version: FormatVersion::V2,
254 };
255 let mut entries = vec![
256 ManifestEntry {
257 status: ManifestStatus::Added,
258 snapshot_id: None,
259 sequence_number: None,
260 file_sequence_number: None,
261 data_file: DataFile {content:DataContentType::Data,file_path:"s3a://icebergdata/demo/s1/t1/data/00000-0-ba56fbfa-f2ff-40c9-bb27-565ad6dc2be8-00000.parquet".to_string(),file_format:DataFileFormat::Parquet,partition:Struct::empty(),record_count:1,file_size_in_bytes:5442,column_sizes:HashMap::from([(0,73),(6,34),(2,73),(7,61),(3,61),(5,62),(9,79),(10,73),(1,61),(4,73),(8,73)]),value_counts:HashMap::from([(4,1),(5,1),(2,1),(0,1),(3,1),(6,1),(8,1),(1,1),(10,1),(7,1),(9,1)]),null_value_counts:HashMap::from([(1,0),(6,0),(2,0),(8,0),(0,0),(3,0),(5,0),(9,0),(7,0),(4,0),(10,0)]),nan_value_counts:HashMap::new(),lower_bounds:HashMap::new(),upper_bounds:HashMap::new(),key_metadata:None,split_offsets:Some(vec![4]),equality_ids:Some(Vec::new()),sort_order_id:None, partition_spec_id: 0,first_row_id: None,referenced_data_file: None,content_offset: None,content_size_in_bytes: None }
262 }
263 ];
264
265 let tmp_dir = TempDir::new().unwrap();
267 let path = tmp_dir.path().join("test_manifest.avro");
268 let io = FileIO::new_with_fs();
269 let output_file = io.new_output(path.to_str().unwrap()).unwrap();
270 let mut writer = ManifestWriterBuilder::new(
271 output_file,
272 Some(1),
273 None,
274 metadata.schema.clone(),
275 metadata.partition_spec.clone(),
276 )
277 .build_v2_data();
278 for entry in &entries {
279 writer.add_entry(entry.clone()).unwrap();
280 }
281 writer.write_manifest_file().await.unwrap();
282
283 let actual_manifest =
285 Manifest::parse_avro(fs::read(path).expect("read_file must succeed").as_slice())
286 .unwrap();
287 entries[0].snapshot_id = Some(1);
289 assert_eq!(actual_manifest, Manifest::new(metadata, entries));
290 }
291
292 #[test]
293 fn test_parse_snappy_manifest_v2() {
294 let schema = Arc::new(
295 Schema::builder()
296 .with_fields(vec![Arc::new(NestedField::optional(
297 1,
298 "id",
299 Type::Primitive(PrimitiveType::Long),
300 ))])
301 .build()
302 .unwrap(),
303 );
304 let partition_spec = PartitionSpec::builder(schema.clone())
305 .with_spec_id(0)
306 .build()
307 .unwrap();
308
309 for (manifest_content, file_content, file_path) in [
310 (
311 ManifestContentType::Data,
312 DataContentType::Data,
313 "s3://bucket/table/data/data.parquet",
314 ),
315 (
316 ManifestContentType::Deletes,
317 DataContentType::PositionDeletes,
318 "s3://bucket/table/data/delete.parquet",
319 ),
320 ] {
321 let metadata = ManifestMetadata {
322 schema_id: 0,
323 schema: schema.clone(),
324 partition_spec: partition_spec.clone(),
325 content: manifest_content,
326 format_version: FormatVersion::V2,
327 };
328 let entry = ManifestEntry {
329 status: ManifestStatus::Added,
330 snapshot_id: Some(1),
331 sequence_number: None,
332 file_sequence_number: None,
333 data_file: DataFile {
334 content: file_content,
335 file_path: file_path.to_string(),
336 file_format: DataFileFormat::Parquet,
337 partition: Struct::empty(),
338 record_count: 1,
339 file_size_in_bytes: 1024,
340 column_sizes: HashMap::new(),
341 value_counts: HashMap::new(),
342 null_value_counts: HashMap::new(),
343 nan_value_counts: HashMap::new(),
344 lower_bounds: HashMap::new(),
345 upper_bounds: HashMap::new(),
346 key_metadata: None,
347 split_offsets: None,
348 equality_ids: None,
349 sort_order_id: None,
350 partition_spec_id: 0,
351 first_row_id: None,
352 referenced_data_file: None,
353 content_offset: None,
354 content_size_in_bytes: None,
355 },
356 };
357
358 let partition_type = metadata
359 .partition_spec
360 .partition_type(&metadata.schema)
361 .unwrap();
362 let avro_schema = manifest_schema_v2(&partition_type).unwrap();
363 let mut writer = Writer::with_codec(&avro_schema, Vec::new(), Codec::Snappy);
364 writer
365 .add_user_metadata("schema".to_string(), to_vec(&metadata.schema).unwrap())
366 .unwrap();
367 writer
368 .add_user_metadata(
369 "schema-id".to_string(),
370 metadata.schema.schema_id().to_string(),
371 )
372 .unwrap();
373 writer
374 .add_user_metadata(
375 "partition-spec".to_string(),
376 to_vec(&metadata.partition_spec.fields()).unwrap(),
377 )
378 .unwrap();
379 writer
380 .add_user_metadata(
381 "partition-spec-id".to_string(),
382 metadata.partition_spec.spec_id().to_string(),
383 )
384 .unwrap();
385 writer
386 .add_user_metadata(
387 "format-version".to_string(),
388 (metadata.format_version as u8).to_string(),
389 )
390 .unwrap();
391 writer
392 .add_user_metadata("content".to_string(), metadata.content.to_string())
393 .unwrap();
394 let value = to_value(
395 _serde::ManifestEntryV2::try_from(entry.clone(), &partition_type).unwrap(),
396 )
397 .unwrap()
398 .resolve(&avro_schema)
399 .unwrap();
400 writer.append(value).unwrap();
401 let bs = writer.into_inner().unwrap();
402
403 let parsed_manifest = Manifest::parse_avro(&bs).unwrap();
404
405 assert_eq!(parsed_manifest, Manifest::new(metadata, vec![entry]));
406 }
407 }
408
409 #[tokio::test]
410 async fn test_parse_manifest_v2_partition() {
411 let schema = Arc::new(
412 Schema::builder()
413 .with_fields(vec![
414 Arc::new(NestedField::optional(
415 1,
416 "id",
417 Type::Primitive(PrimitiveType::Long),
418 )),
419 Arc::new(NestedField::optional(
420 2,
421 "v_int",
422 Type::Primitive(PrimitiveType::Int),
423 )),
424 Arc::new(NestedField::optional(
425 3,
426 "v_long",
427 Type::Primitive(PrimitiveType::Long),
428 )),
429 Arc::new(NestedField::optional(
430 4,
431 "v_float",
432 Type::Primitive(PrimitiveType::Float),
433 )),
434 Arc::new(NestedField::optional(
435 5,
436 "v_double",
437 Type::Primitive(PrimitiveType::Double),
438 )),
439 Arc::new(NestedField::optional(
440 6,
441 "v_varchar",
442 Type::Primitive(PrimitiveType::String),
443 )),
444 Arc::new(NestedField::optional(
445 7,
446 "v_bool",
447 Type::Primitive(PrimitiveType::Boolean),
448 )),
449 Arc::new(NestedField::optional(
450 8,
451 "v_date",
452 Type::Primitive(PrimitiveType::Date),
453 )),
454 Arc::new(NestedField::optional(
455 9,
456 "v_timestamp",
457 Type::Primitive(PrimitiveType::Timestamptz),
458 )),
459 Arc::new(NestedField::optional(
460 10,
461 "v_decimal",
462 Type::Primitive(PrimitiveType::Decimal {
463 precision: 36,
464 scale: 10,
465 }),
466 )),
467 Arc::new(NestedField::optional(
468 11,
469 "v_ts_ntz",
470 Type::Primitive(PrimitiveType::Timestamp),
471 )),
472 Arc::new(NestedField::optional(
473 12,
474 "v_ts_ns_ntz",
475 Type::Primitive(PrimitiveType::TimestampNs),
476 )),
477 ])
478 .build()
479 .unwrap(),
480 );
481 let metadata = ManifestMetadata {
482 schema_id: 0,
483 schema: schema.clone(),
484 partition_spec: PartitionSpec::builder(schema)
485 .with_spec_id(0)
486 .add_partition_field("v_int", "v_int", Transform::Identity)
487 .unwrap()
488 .add_partition_field("v_long", "v_long", Transform::Identity)
489 .unwrap()
490 .build()
491 .unwrap(),
492 content: ManifestContentType::Data,
493 format_version: FormatVersion::V2,
494 };
495 let mut entries = vec![ManifestEntry {
496 status: ManifestStatus::Added,
497 snapshot_id: None,
498 sequence_number: None,
499 file_sequence_number: None,
500 data_file: DataFile {
501 content: DataContentType::Data,
502 file_format: DataFileFormat::Parquet,
503 file_path: "s3a://icebergdata/demo/s1/t1/data/00000-0-378b56f5-5c52-4102-a2c2-f05f8a7cbe4a-00000.parquet".to_string(),
504 partition: Struct::from_iter(
505 vec![
506 Some(Literal::int(1)),
507 Some(Literal::long(1000)),
508 ]
509 .into_iter()
510 ),
511 record_count: 1,
512 file_size_in_bytes: 5442,
513 column_sizes: HashMap::from([
514 (0, 73),
515 (6, 34),
516 (2, 73),
517 (7, 61),
518 (3, 61),
519 (5, 62),
520 (9, 79),
521 (10, 73),
522 (1, 61),
523 (4, 73),
524 (8, 73)
525 ]),
526 value_counts: HashMap::from([
527 (4, 1),
528 (5, 1),
529 (2, 1),
530 (0, 1),
531 (3, 1),
532 (6, 1),
533 (8, 1),
534 (1, 1),
535 (10, 1),
536 (7, 1),
537 (9, 1)
538 ]),
539 null_value_counts: HashMap::from([
540 (1, 0),
541 (6, 0),
542 (2, 0),
543 (8, 0),
544 (0, 0),
545 (3, 0),
546 (5, 0),
547 (9, 0),
548 (7, 0),
549 (4, 0),
550 (10, 0)
551 ]),
552 nan_value_counts: HashMap::new(),
553 lower_bounds: HashMap::new(),
554 upper_bounds: HashMap::new(),
555 key_metadata: None,
556 split_offsets: Some(vec![4]),
557 equality_ids: Some(Vec::new()),
558 sort_order_id: None,
559 partition_spec_id: 0,
560 first_row_id: None,
561 referenced_data_file: None,
562 content_offset: None,
563 content_size_in_bytes: None,
564 },
565 }];
566
567 let tmp_dir = TempDir::new().unwrap();
569 let path = tmp_dir.path().join("test_manifest.avro");
570 let io = FileIO::new_with_fs();
571 let output_file = io.new_output(path.to_str().unwrap()).unwrap();
572 let mut writer = ManifestWriterBuilder::new(
573 output_file,
574 Some(2),
575 None,
576 metadata.schema.clone(),
577 metadata.partition_spec.clone(),
578 )
579 .build_v2_data();
580 for entry in &entries {
581 writer.add_entry(entry.clone()).unwrap();
582 }
583 let manifest_file = writer.write_manifest_file().await.unwrap();
584 assert_eq!(manifest_file.sequence_number, UNASSIGNED_SEQUENCE_NUMBER);
585 assert_eq!(
586 manifest_file.min_sequence_number,
587 UNASSIGNED_SEQUENCE_NUMBER
588 );
589
590 let actual_manifest =
592 Manifest::parse_avro(fs::read(path).expect("read_file must succeed").as_slice())
593 .unwrap();
594 entries[0].snapshot_id = Some(2);
596 assert_eq!(actual_manifest, Manifest::new(metadata, entries));
597 }
598
599 #[tokio::test]
600 async fn test_parse_manifest_v1_unpartition() {
601 let schema = Arc::new(
602 Schema::builder()
603 .with_schema_id(1)
604 .with_fields(vec![
605 Arc::new(NestedField::optional(
606 1,
607 "id",
608 Type::Primitive(PrimitiveType::Int),
609 )),
610 Arc::new(NestedField::optional(
611 2,
612 "data",
613 Type::Primitive(PrimitiveType::String),
614 )),
615 Arc::new(NestedField::optional(
616 3,
617 "comment",
618 Type::Primitive(PrimitiveType::String),
619 )),
620 ])
621 .build()
622 .unwrap(),
623 );
624 let metadata = ManifestMetadata {
625 schema_id: 1,
626 schema: schema.clone(),
627 partition_spec: PartitionSpec::builder(schema)
628 .with_spec_id(0)
629 .build()
630 .unwrap(),
631 content: ManifestContentType::Data,
632 format_version: FormatVersion::V1,
633 };
634 let mut entries = vec![ManifestEntry {
635 status: ManifestStatus::Added,
636 snapshot_id: Some(0),
637 sequence_number: Some(0),
638 file_sequence_number: Some(0),
639 data_file: DataFile {
640 content: DataContentType::Data,
641 file_path: "s3://testbucket/iceberg_data/iceberg_ctl/iceberg_db/iceberg_tbl/data/00000-7-45268d71-54eb-476c-b42c-942d880c04a1-00001.parquet".to_string(),
642 file_format: DataFileFormat::Parquet,
643 partition: Struct::empty(),
644 record_count: 1,
645 file_size_in_bytes: 875,
646 column_sizes: HashMap::from([(1,47),(2,48),(3,52)]),
647 value_counts: HashMap::from([(1,1),(2,1),(3,1)]),
648 null_value_counts: HashMap::from([(1,0),(2,0),(3,0)]),
649 nan_value_counts: HashMap::new(),
650 lower_bounds: HashMap::from([(1,Datum::int(1)),(2,Datum::string("a")),(3,Datum::string("AC/DC"))]),
651 upper_bounds: HashMap::from([(1,Datum::int(1)),(2,Datum::string("a")),(3,Datum::string("AC/DC"))]),
652 key_metadata: None,
653 split_offsets: Some(vec![4]),
654 equality_ids: None,
655 sort_order_id: Some(0),
656 partition_spec_id: 0,
657 first_row_id: None,
658 referenced_data_file: None,
659 content_offset: None,
660 content_size_in_bytes: None,
661 }
662 }];
663
664 let tmp_dir = TempDir::new().unwrap();
666 let path = tmp_dir.path().join("test_manifest.avro");
667 let io = FileIO::new_with_fs();
668 let output_file = io.new_output(path.to_str().unwrap()).unwrap();
669 let mut writer = ManifestWriterBuilder::new(
670 output_file,
671 Some(3),
672 None,
673 metadata.schema.clone(),
674 metadata.partition_spec.clone(),
675 )
676 .build_v1();
677 for entry in &entries {
678 writer.add_entry(entry.clone()).unwrap();
679 }
680 writer.write_manifest_file().await.unwrap();
681
682 let actual_manifest =
684 Manifest::parse_avro(fs::read(path).expect("read_file must succeed").as_slice())
685 .unwrap();
686 entries[0].snapshot_id = Some(3);
688 assert_eq!(actual_manifest, Manifest::new(metadata, entries));
689 }
690
691 #[tokio::test]
692 async fn test_parse_manifest_v1_partition() {
693 let schema = Arc::new(
694 Schema::builder()
695 .with_fields(vec![
696 Arc::new(NestedField::optional(
697 1,
698 "id",
699 Type::Primitive(PrimitiveType::Long),
700 )),
701 Arc::new(NestedField::optional(
702 2,
703 "data",
704 Type::Primitive(PrimitiveType::String),
705 )),
706 Arc::new(NestedField::optional(
707 3,
708 "category",
709 Type::Primitive(PrimitiveType::String),
710 )),
711 ])
712 .build()
713 .unwrap(),
714 );
715 let metadata = ManifestMetadata {
716 schema_id: 0,
717 schema: schema.clone(),
718 partition_spec: PartitionSpec::builder(schema)
719 .add_partition_field("category", "category", Transform::Identity)
720 .unwrap()
721 .build()
722 .unwrap(),
723 content: ManifestContentType::Data,
724 format_version: FormatVersion::V1,
725 };
726 let mut entries = vec![
727 ManifestEntry {
728 status: ManifestStatus::Added,
729 snapshot_id: Some(0),
730 sequence_number: Some(0),
731 file_sequence_number: Some(0),
732 data_file: DataFile {
733 content: DataContentType::Data,
734 file_path: "s3://testbucket/prod/db/sample/data/category=x/00010-1-d5c93668-1e52-41ac-92a6-bba590cbf249-00001.parquet".to_string(),
735 file_format: DataFileFormat::Parquet,
736 partition: Struct::from_iter(
737 vec![
738 Some(
739 Literal::string("x"),
740 ),
741 ]
742 .into_iter()
743 ),
744 record_count: 1,
745 file_size_in_bytes: 874,
746 column_sizes: HashMap::from([(1, 46), (2, 48), (3, 48)]),
747 value_counts: HashMap::from([(1, 1), (2, 1), (3, 1)]),
748 null_value_counts: HashMap::from([(1, 0), (2, 0), (3, 0)]),
749 nan_value_counts: HashMap::new(),
750 lower_bounds: HashMap::from([
751 (1, Datum::long(1)),
752 (2, Datum::string("a")),
753 (3, Datum::string("x"))
754 ]),
755 upper_bounds: HashMap::from([
756 (1, Datum::long(1)),
757 (2, Datum::string("a")),
758 (3, Datum::string("x"))
759 ]),
760 key_metadata: None,
761 split_offsets: Some(vec![4]),
762 equality_ids: None,
763 sort_order_id: Some(0),
764 partition_spec_id: 0,
765 first_row_id: None,
766 referenced_data_file: None,
767 content_offset: None,
768 content_size_in_bytes: None,
769 },
770 }
771 ];
772
773 let tmp_dir = TempDir::new().unwrap();
775 let path = tmp_dir.path().join("test_manifest.avro");
776 let io = FileIO::new_with_fs();
777 let output_file = io.new_output(path.to_str().unwrap()).unwrap();
778 let mut writer = ManifestWriterBuilder::new(
779 output_file,
780 Some(2),
781 None,
782 metadata.schema.clone(),
783 metadata.partition_spec.clone(),
784 )
785 .build_v1();
786 for entry in &entries {
787 writer.add_entry(entry.clone()).unwrap();
788 }
789 let manifest_file = writer.write_manifest_file().await.unwrap();
790 let partitions = manifest_file.partitions.unwrap();
791 assert_eq!(partitions.len(), 1);
792 assert_eq!(
793 partitions[0].clone().lower_bound.unwrap(),
794 Datum::string("x").to_bytes().unwrap()
795 );
796 assert_eq!(
797 partitions[0].clone().upper_bound.unwrap(),
798 Datum::string("x").to_bytes().unwrap()
799 );
800
801 let actual_manifest =
803 Manifest::parse_avro(fs::read(path).expect("read_file must succeed").as_slice())
804 .unwrap();
805 entries[0].snapshot_id = Some(2);
807 assert_eq!(actual_manifest, Manifest::new(metadata, entries));
808 }
809
810 #[tokio::test]
811 async fn test_parse_manifest_with_schema_evolution() {
812 let schema = Arc::new(
813 Schema::builder()
814 .with_fields(vec![
815 Arc::new(NestedField::optional(
816 1,
817 "id",
818 Type::Primitive(PrimitiveType::Long),
819 )),
820 Arc::new(NestedField::optional(
821 2,
822 "v_int",
823 Type::Primitive(PrimitiveType::Int),
824 )),
825 ])
826 .build()
827 .unwrap(),
828 );
829 let metadata = ManifestMetadata {
830 schema_id: 0,
831 schema: schema.clone(),
832 partition_spec: PartitionSpec::builder(schema)
833 .with_spec_id(0)
834 .build()
835 .unwrap(),
836 content: ManifestContentType::Data,
837 format_version: FormatVersion::V2,
838 };
839 let entries = vec![ManifestEntry {
840 status: ManifestStatus::Added,
841 snapshot_id: None,
842 sequence_number: None,
843 file_sequence_number: None,
844 data_file: DataFile {
845 content: DataContentType::Data,
846 file_format: DataFileFormat::Parquet,
847 file_path: "s3a://icebergdata/demo/s1/t1/data/00000-0-378b56f5-5c52-4102-a2c2-f05f8a7cbe4a-00000.parquet".to_string(),
848 partition: Struct::empty(),
849 record_count: 1,
850 file_size_in_bytes: 5442,
851 column_sizes: HashMap::from([
852 (1, 61),
853 (2, 73),
854 (3, 61),
855 ]),
856 value_counts: HashMap::default(),
857 null_value_counts: HashMap::default(),
858 nan_value_counts: HashMap::new(),
859 lower_bounds: HashMap::from([
860 (1, Datum::long(1)),
861 (2, Datum::int(2)),
862 (3, Datum::string("x"))
863 ]),
864 upper_bounds: HashMap::from([
865 (1, Datum::long(1)),
866 (2, Datum::int(2)),
867 (3, Datum::string("x"))
868 ]),
869 key_metadata: None,
870 split_offsets: Some(vec![4]),
871 equality_ids: None,
872 sort_order_id: None,
873 partition_spec_id: 0,
874 first_row_id: None,
875 referenced_data_file: None,
876 content_offset: None,
877 content_size_in_bytes: None,
878 },
879 }];
880
881 let tmp_dir = TempDir::new().unwrap();
883 let path = tmp_dir.path().join("test_manifest.avro");
884 let io = FileIO::new_with_fs();
885 let output_file = io.new_output(path.to_str().unwrap()).unwrap();
886 let mut writer = ManifestWriterBuilder::new(
887 output_file,
888 Some(2),
889 None,
890 metadata.schema.clone(),
891 metadata.partition_spec.clone(),
892 )
893 .build_v2_data();
894 for entry in &entries {
895 writer.add_entry(entry.clone()).unwrap();
896 }
897 writer.write_manifest_file().await.unwrap();
898
899 let actual_manifest =
901 Manifest::parse_avro(fs::read(path).expect("read_file must succeed").as_slice())
902 .unwrap();
903
904 let schema = Arc::new(
908 Schema::builder()
909 .with_fields(vec![
910 Arc::new(NestedField::optional(
911 1,
912 "id",
913 Type::Primitive(PrimitiveType::Long),
914 )),
915 Arc::new(NestedField::optional(
916 2,
917 "v_int",
918 Type::Primitive(PrimitiveType::Int),
919 )),
920 ])
921 .build()
922 .unwrap(),
923 );
924 let expected_manifest = Manifest {
925 metadata: ManifestMetadata {
926 schema_id: 0,
927 schema: schema.clone(),
928 partition_spec: PartitionSpec::builder(schema).with_spec_id(0).build().unwrap(),
929 content: ManifestContentType::Data,
930 format_version: FormatVersion::V2,
931 },
932 entries: vec![Arc::new(ManifestEntry {
933 status: ManifestStatus::Added,
934 snapshot_id: Some(2),
935 sequence_number: None,
936 file_sequence_number: None,
937 data_file: DataFile {
938 content: DataContentType::Data,
939 file_format: DataFileFormat::Parquet,
940 file_path: "s3a://icebergdata/demo/s1/t1/data/00000-0-378b56f5-5c52-4102-a2c2-f05f8a7cbe4a-00000.parquet".to_string(),
941 partition: Struct::empty(),
942 record_count: 1,
943 file_size_in_bytes: 5442,
944 column_sizes: HashMap::from([
945 (1, 61),
946 (2, 73),
947 (3, 61),
948 ]),
949 value_counts: HashMap::default(),
950 null_value_counts: HashMap::default(),
951 nan_value_counts: HashMap::new(),
952 lower_bounds: HashMap::from([
953 (1, Datum::long(1)),
954 (2, Datum::int(2)),
955 ]),
956 upper_bounds: HashMap::from([
957 (1, Datum::long(1)),
958 (2, Datum::int(2)),
959 ]),
960 key_metadata: None,
961 split_offsets: Some(vec![4]),
962 equality_ids: None,
963 sort_order_id: None,
964 partition_spec_id: 0,
965 first_row_id: None,
966 referenced_data_file: None,
967 content_offset: None,
968 content_size_in_bytes: None,
969 },
970 })],
971 };
972
973 assert_eq!(actual_manifest, expected_manifest);
974 }
975
976 #[tokio::test]
977 async fn test_manifest_summary() {
978 let schema = Arc::new(
979 Schema::builder()
980 .with_fields(vec![
981 Arc::new(NestedField::optional(
982 1,
983 "time",
984 Type::Primitive(PrimitiveType::Date),
985 )),
986 Arc::new(NestedField::optional(
987 2,
988 "v_float",
989 Type::Primitive(PrimitiveType::Float),
990 )),
991 Arc::new(NestedField::optional(
992 3,
993 "v_double",
994 Type::Primitive(PrimitiveType::Double),
995 )),
996 ])
997 .build()
998 .unwrap(),
999 );
1000 let partition_spec = PartitionSpec::builder(schema.clone())
1001 .with_spec_id(0)
1002 .add_partition_field("time", "year_of_time", Transform::Year)
1003 .unwrap()
1004 .add_partition_field("v_float", "f", Transform::Identity)
1005 .unwrap()
1006 .add_partition_field("v_double", "d", Transform::Identity)
1007 .unwrap()
1008 .build()
1009 .unwrap();
1010 let metadata = ManifestMetadata {
1011 schema_id: 0,
1012 schema,
1013 partition_spec,
1014 content: ManifestContentType::Data,
1015 format_version: FormatVersion::V2,
1016 };
1017 let entries = vec![
1018 ManifestEntry {
1019 status: ManifestStatus::Added,
1020 snapshot_id: None,
1021 sequence_number: None,
1022 file_sequence_number: None,
1023 data_file: DataFile {
1024 content: DataContentType::Data,
1025 file_path: "s3a://icebergdata/demo/s1/t1/data/00000-0-ba56fbfa-f2ff-40c9-bb27-565ad6dc2be8-00000.parquet".to_string(),
1026 file_format: DataFileFormat::Parquet,
1027 partition: Struct::from_iter(
1028 vec![
1029 Some(Literal::int(2021)),
1030 Some(Literal::float(1.0)),
1031 Some(Literal::double(2.0)),
1032 ]
1033 ),
1034 record_count: 1,
1035 file_size_in_bytes: 5442,
1036 column_sizes: HashMap::from([(0,73),(6,34),(2,73),(7,61),(3,61),(5,62),(9,79),(10,73),(1,61),(4,73),(8,73)]),
1037 value_counts: HashMap::from([(4,1),(5,1),(2,1),(0,1),(3,1),(6,1),(8,1),(1,1),(10,1),(7,1),(9,1)]),
1038 null_value_counts: HashMap::from([(1,0),(6,0),(2,0),(8,0),(0,0),(3,0),(5,0),(9,0),(7,0),(4,0),(10,0)]),
1039 nan_value_counts: HashMap::new(),
1040 lower_bounds: HashMap::new(),
1041 upper_bounds: HashMap::new(),
1042 key_metadata: None,
1043 split_offsets: Some(vec![4]),
1044 equality_ids: None,
1045 sort_order_id: None,
1046 partition_spec_id: 0,
1047 first_row_id: None,
1048 referenced_data_file: None,
1049 content_offset: None,
1050 content_size_in_bytes: None,
1051 }
1052 },
1053 ManifestEntry {
1054 status: ManifestStatus::Added,
1055 snapshot_id: None,
1056 sequence_number: None,
1057 file_sequence_number: None,
1058 data_file: DataFile {
1059 content: DataContentType::Data,
1060 file_path: "s3a://icebergdata/demo/s1/t1/data/00000-0-ba56fbfa-f2ff-40c9-bb27-565ad6dc2be8-00000.parquet".to_string(),
1061 file_format: DataFileFormat::Parquet,
1062 partition: Struct::from_iter(
1063 vec![
1064 Some(Literal::int(1111)),
1065 Some(Literal::float(15.5)),
1066 Some(Literal::double(25.5)),
1067 ]
1068 ),
1069 record_count: 1,
1070 file_size_in_bytes: 5442,
1071 column_sizes: HashMap::from([(0,73),(6,34),(2,73),(7,61),(3,61),(5,62),(9,79),(10,73),(1,61),(4,73),(8,73)]),
1072 value_counts: HashMap::from([(4,1),(5,1),(2,1),(0,1),(3,1),(6,1),(8,1),(1,1),(10,1),(7,1),(9,1)]),
1073 null_value_counts: HashMap::from([(1,0),(6,0),(2,0),(8,0),(0,0),(3,0),(5,0),(9,0),(7,0),(4,0),(10,0)]),
1074 nan_value_counts: HashMap::new(),
1075 lower_bounds: HashMap::new(),
1076 upper_bounds: HashMap::new(),
1077 key_metadata: None,
1078 split_offsets: Some(vec![4]),
1079 equality_ids: None,
1080 sort_order_id: None,
1081 partition_spec_id: 0,
1082 first_row_id: None,
1083 referenced_data_file: None,
1084 content_offset: None,
1085 content_size_in_bytes: None,
1086 }
1087 },
1088 ManifestEntry {
1089 status: ManifestStatus::Added,
1090 snapshot_id: None,
1091 sequence_number: None,
1092 file_sequence_number: None,
1093 data_file: DataFile {
1094 content: DataContentType::Data,
1095 file_path: "s3a://icebergdata/demo/s1/t1/data/00000-0-ba56fbfa-f2ff-40c9-bb27-565ad6dc2be8-00000.parquet".to_string(),
1096 file_format: DataFileFormat::Parquet,
1097 partition: Struct::from_iter(
1098 vec![
1099 Some(Literal::int(1211)),
1100 Some(Literal::float(f32::NAN)),
1101 Some(Literal::double(1.0)),
1102 ]
1103 ),
1104 record_count: 1,
1105 file_size_in_bytes: 5442,
1106 column_sizes: HashMap::from([(0,73),(6,34),(2,73),(7,61),(3,61),(5,62),(9,79),(10,73),(1,61),(4,73),(8,73)]),
1107 value_counts: HashMap::from([(4,1),(5,1),(2,1),(0,1),(3,1),(6,1),(8,1),(1,1),(10,1),(7,1),(9,1)]),
1108 null_value_counts: HashMap::from([(1,0),(6,0),(2,0),(8,0),(0,0),(3,0),(5,0),(9,0),(7,0),(4,0),(10,0)]),
1109 nan_value_counts: HashMap::new(),
1110 lower_bounds: HashMap::new(),
1111 upper_bounds: HashMap::new(),
1112 key_metadata: None,
1113 split_offsets: Some(vec![4]),
1114 equality_ids: None,
1115 sort_order_id: None,
1116 partition_spec_id: 0,
1117 first_row_id: None,
1118 referenced_data_file: None,
1119 content_offset: None,
1120 content_size_in_bytes: None,
1121 }
1122 },
1123 ManifestEntry {
1124 status: ManifestStatus::Added,
1125 snapshot_id: None,
1126 sequence_number: None,
1127 file_sequence_number: None,
1128 data_file: DataFile {
1129 content: DataContentType::Data,
1130 file_path: "s3a://icebergdata/demo/s1/t1/data/00000-0-ba56fbfa-f2ff-40c9-bb27-565ad6dc2be8-00000.parquet".to_string(),
1131 file_format: DataFileFormat::Parquet,
1132 partition: Struct::from_iter(
1133 vec![
1134 Some(Literal::int(1111)),
1135 None,
1136 Some(Literal::double(11.0)),
1137 ]
1138 ),
1139 record_count: 1,
1140 file_size_in_bytes: 5442,
1141 column_sizes: HashMap::from([(0,73),(6,34),(2,73),(7,61),(3,61),(5,62),(9,79),(10,73),(1,61),(4,73),(8,73)]),
1142 value_counts: HashMap::from([(4,1),(5,1),(2,1),(0,1),(3,1),(6,1),(8,1),(1,1),(10,1),(7,1),(9,1)]),
1143 null_value_counts: HashMap::from([(1,0),(6,0),(2,0),(8,0),(0,0),(3,0),(5,0),(9,0),(7,0),(4,0),(10,0)]),
1144 nan_value_counts: HashMap::new(),
1145 lower_bounds: HashMap::new(),
1146 upper_bounds: HashMap::new(),
1147 key_metadata: None,
1148 split_offsets: Some(vec![4]),
1149 equality_ids: None,
1150 sort_order_id: None,
1151 partition_spec_id: 0,
1152 first_row_id: None,
1153 referenced_data_file: None,
1154 content_offset: None,
1155 content_size_in_bytes: None,
1156 }
1157 },
1158 ];
1159
1160 let tmp_dir = TempDir::new().unwrap();
1162 let path = tmp_dir.path().join("test_manifest.avro");
1163 let io = FileIO::new_with_fs();
1164 let output_file = io.new_output(path.to_str().unwrap()).unwrap();
1165 let mut writer = ManifestWriterBuilder::new(
1166 output_file,
1167 Some(1),
1168 None,
1169 metadata.schema.clone(),
1170 metadata.partition_spec.clone(),
1171 )
1172 .build_v2_data();
1173 for entry in &entries {
1174 writer.add_entry(entry.clone()).unwrap();
1175 }
1176 let res = writer.write_manifest_file().await.unwrap();
1177
1178 let partitions = res.partitions.unwrap();
1179
1180 assert_eq!(partitions.len(), 3);
1181 assert_eq!(
1182 partitions[0].clone().lower_bound.unwrap(),
1183 Datum::int(1111).to_bytes().unwrap()
1184 );
1185 assert_eq!(
1186 partitions[0].clone().upper_bound.unwrap(),
1187 Datum::int(2021).to_bytes().unwrap()
1188 );
1189 assert!(!partitions[0].clone().contains_null);
1190 assert_eq!(partitions[0].clone().contains_nan, Some(false));
1191
1192 assert_eq!(
1193 partitions[1].clone().lower_bound.unwrap(),
1194 Datum::float(1.0).to_bytes().unwrap()
1195 );
1196 assert_eq!(
1197 partitions[1].clone().upper_bound.unwrap(),
1198 Datum::float(15.5).to_bytes().unwrap()
1199 );
1200 assert!(partitions[1].clone().contains_null);
1201 assert_eq!(partitions[1].clone().contains_nan, Some(true));
1202
1203 assert_eq!(
1204 partitions[2].clone().lower_bound.unwrap(),
1205 Datum::double(1.0).to_bytes().unwrap()
1206 );
1207 assert_eq!(
1208 partitions[2].clone().upper_bound.unwrap(),
1209 Datum::double(25.5).to_bytes().unwrap()
1210 );
1211 assert!(!partitions[2].clone().contains_null);
1212 assert_eq!(partitions[2].clone().contains_nan, Some(false));
1213 }
1214
1215 #[test]
1216 fn test_data_file_serialization() {
1217 let schema = Schema::builder()
1219 .with_schema_id(1)
1220 .with_identifier_field_ids(vec![1])
1221 .with_fields(vec![
1222 NestedField::required(1, "id", Type::Primitive(PrimitiveType::Long)).into(),
1223 NestedField::required(2, "name", Type::Primitive(PrimitiveType::String)).into(),
1224 ])
1225 .build()
1226 .unwrap();
1227
1228 let partition_spec = PartitionSpec::builder(schema.clone())
1230 .with_spec_id(1)
1231 .add_partition_field("id", "id_partition", Transform::Identity)
1232 .unwrap()
1233 .build()
1234 .unwrap();
1235
1236 let partition_type = partition_spec.partition_type(&schema).unwrap();
1238
1239 let data_files = vec![
1241 DataFileBuilder::default()
1242 .content(DataContentType::Data)
1243 .file_format(DataFileFormat::Parquet)
1244 .file_path("path/to/file1.parquet".to_string())
1245 .file_size_in_bytes(1024)
1246 .record_count(100)
1247 .partition_spec_id(1)
1248 .partition(Struct::empty())
1249 .column_sizes(HashMap::from([(1, 512), (2, 1024)]))
1250 .value_counts(HashMap::from([(1, 100), (2, 500)]))
1251 .null_value_counts(HashMap::from([(1, 0), (2, 1)]))
1252 .build()
1253 .unwrap(),
1254 DataFileBuilder::default()
1255 .content(DataContentType::Data)
1256 .file_format(DataFileFormat::Parquet)
1257 .file_path("path/to/file2.parquet".to_string())
1258 .file_size_in_bytes(2048)
1259 .record_count(200)
1260 .partition_spec_id(1)
1261 .partition(Struct::empty())
1262 .column_sizes(HashMap::from([(1, 1024), (2, 2048)]))
1263 .value_counts(HashMap::from([(1, 200), (2, 600)]))
1264 .null_value_counts(HashMap::from([(1, 10), (2, 999)]))
1265 .build()
1266 .unwrap(),
1267 ];
1268
1269 let serialized_files = data_files
1271 .clone()
1272 .into_iter()
1273 .map(|f| serialize_data_file_to_json(f, &partition_type, FormatVersion::V2).unwrap())
1274 .collect::<Vec<String>>();
1275
1276 assert_eq!(serialized_files.len(), 2);
1278 let pretty_json1: Value = serde_json::from_str(serialized_files.first().unwrap()).unwrap();
1279 let pretty_json2: Value = serde_json::from_str(serialized_files.get(1).unwrap()).unwrap();
1280 let expected_serialized_file1 = serde_json::json!({
1281 "content": 0,
1282 "file_path": "path/to/file1.parquet",
1283 "file_format": "PARQUET",
1284 "partition": {},
1285 "record_count": 100,
1286 "file_size_in_bytes": 1024,
1287 "column_sizes": [
1288 { "key": 1, "value": 512 },
1289 { "key": 2, "value": 1024 }
1290 ],
1291 "value_counts": [
1292 { "key": 1, "value": 100 },
1293 { "key": 2, "value": 500 }
1294 ],
1295 "null_value_counts": [
1296 { "key": 1, "value": 0 },
1297 { "key": 2, "value": 1 }
1298 ],
1299 "nan_value_counts": [],
1300 "lower_bounds": [],
1301 "upper_bounds": [],
1302 "key_metadata": null,
1303 "split_offsets": null,
1304 "equality_ids": null,
1305 "sort_order_id": null,
1306 "first_row_id": null,
1307 "referenced_data_file": null,
1308 "content_offset": null,
1309 "content_size_in_bytes": null
1310 });
1311 let expected_serialized_file2 = serde_json::json!({
1312 "content": 0,
1313 "file_path": "path/to/file2.parquet",
1314 "file_format": "PARQUET",
1315 "partition": {},
1316 "record_count": 200,
1317 "file_size_in_bytes": 2048,
1318 "column_sizes": [
1319 { "key": 1, "value": 1024 },
1320 { "key": 2, "value": 2048 }
1321 ],
1322 "value_counts": [
1323 { "key": 1, "value": 200 },
1324 { "key": 2, "value": 600 }
1325 ],
1326 "null_value_counts": [
1327 { "key": 1, "value": 10 },
1328 { "key": 2, "value": 999 }
1329 ],
1330 "nan_value_counts": [],
1331 "lower_bounds": [],
1332 "upper_bounds": [],
1333 "key_metadata": null,
1334 "split_offsets": null,
1335 "equality_ids": null,
1336 "sort_order_id": null,
1337 "first_row_id": null,
1338 "referenced_data_file": null,
1339 "content_offset": null,
1340 "content_size_in_bytes": null
1341 });
1342 assert_eq!(pretty_json1, expected_serialized_file1);
1343 assert_eq!(pretty_json2, expected_serialized_file2);
1344
1345 let deserialized_files: Vec<DataFile> = serialized_files
1347 .into_iter()
1348 .map(|json| {
1349 deserialize_data_file_from_json(
1350 &json,
1351 partition_spec.spec_id(),
1352 &partition_type,
1353 &schema,
1354 )
1355 .unwrap()
1356 })
1357 .collect();
1358
1359 assert_eq!(deserialized_files.len(), 2);
1361 let deserialized_data_file1 = deserialized_files.first().unwrap();
1362 let deserialized_data_file2 = deserialized_files.get(1).unwrap();
1363 let original_data_file1 = data_files.first().unwrap();
1364 let original_data_file2 = data_files.get(1).unwrap();
1365
1366 assert_eq!(deserialized_data_file1, original_data_file1);
1367 assert_eq!(deserialized_data_file2, original_data_file2);
1368 }
1369}