iceberg/spec/manifest/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18mod _serde;
19
20mod data_file;
21pub use data_file::*;
22mod entry;
23pub use entry::*;
24mod metadata;
25pub use metadata::*;
26mod writer;
27use std::sync::Arc;
28
29use apache_avro::{Reader as AvroReader, from_value};
30pub use writer::*;
31
32use super::{
33    Datum, FormatVersion, ManifestContentType, PartitionSpec, PrimitiveType, Schema, Struct,
34    UNASSIGNED_SEQUENCE_NUMBER,
35};
36use crate::error::Result;
37use crate::{Error, ErrorKind};
38
39/// A manifest contains metadata and a list of entries.
40#[derive(Debug, PartialEq, Eq, Clone)]
41pub struct Manifest {
42    metadata: ManifestMetadata,
43    entries: Vec<ManifestEntryRef>,
44}
45
46impl Manifest {
47    /// Parse manifest metadata and entries from bytes of avro file.
48    pub(crate) fn try_from_avro_bytes(bs: &[u8]) -> Result<(ManifestMetadata, Vec<ManifestEntry>)> {
49        let reader = AvroReader::new(bs)?;
50
51        // Parse manifest metadata
52        let meta = reader.user_metadata();
53        let metadata = ManifestMetadata::parse(meta)?;
54
55        // Parse manifest entries
56        let partition_type = metadata.partition_spec.partition_type(&metadata.schema)?;
57
58        let entries = match metadata.format_version {
59            FormatVersion::V1 => {
60                let schema = manifest_schema_v1(&partition_type)?;
61                let reader = AvroReader::with_schema(&schema, bs)?;
62                reader
63                    .into_iter()
64                    .map(|value| {
65                        from_value::<_serde::ManifestEntryV1>(&value?)?.try_into(
66                            metadata.partition_spec.spec_id(),
67                            &partition_type,
68                            &metadata.schema,
69                        )
70                    })
71                    .collect::<Result<Vec<_>>>()?
72            }
73            // Manifest Schema & Manifest Entry did not change between V2 and V3
74            FormatVersion::V2 | FormatVersion::V3 => {
75                let schema = manifest_schema_v2(&partition_type)?;
76                let reader = AvroReader::with_schema(&schema, bs)?;
77                reader
78                    .into_iter()
79                    .map(|value| {
80                        from_value::<_serde::ManifestEntryV2>(&value?)?.try_into(
81                            metadata.partition_spec.spec_id(),
82                            &partition_type,
83                            &metadata.schema,
84                        )
85                    })
86                    .collect::<Result<Vec<_>>>()?
87            }
88        };
89
90        Ok((metadata, entries))
91    }
92
93    /// Parse manifest from bytes of avro file.
94    pub fn parse_avro(bs: &[u8]) -> Result<Self> {
95        let (metadata, entries) = Self::try_from_avro_bytes(bs)?;
96        Ok(Self::new(metadata, entries))
97    }
98
99    /// Entries slice.
100    pub fn entries(&self) -> &[ManifestEntryRef] {
101        &self.entries
102    }
103
104    /// Get metadata.
105    pub fn metadata(&self) -> &ManifestMetadata {
106        &self.metadata
107    }
108
109    /// Consume this Manifest, returning its constituent parts
110    pub fn into_parts(self) -> (Vec<ManifestEntryRef>, ManifestMetadata) {
111        let Self { entries, metadata } = self;
112        (entries, metadata)
113    }
114
115    /// Constructor from [`ManifestMetadata`] and [`ManifestEntry`]s.
116    pub fn new(metadata: ManifestMetadata, entries: Vec<ManifestEntry>) -> Self {
117        Self {
118            metadata,
119            entries: entries.into_iter().map(Arc::new).collect(),
120        }
121    }
122}
123
124/// Serialize a DataFile to a JSON string.
125pub fn serialize_data_file_to_json(
126    data_file: DataFile,
127    partition_type: &super::StructType,
128    format_version: FormatVersion,
129) -> Result<String> {
130    let serde = _serde::DataFileSerde::try_from(data_file, partition_type, format_version)?;
131    serde_json::to_string(&serde).map_err(|e| {
132        Error::new(
133            ErrorKind::DataInvalid,
134            "Failed to serialize DataFile to JSON!".to_string(),
135        )
136        .with_source(e)
137    })
138}
139
140/// Deserialize a DataFile from a JSON string.
141pub fn deserialize_data_file_from_json(
142    json: &str,
143    partition_spec_id: i32,
144    partition_type: &super::StructType,
145    schema: &Schema,
146) -> Result<DataFile> {
147    let serde = serde_json::from_str::<_serde::DataFileSerde>(json).map_err(|e| {
148        Error::new(
149            ErrorKind::DataInvalid,
150            "Failed to deserialize JSON to DataFile!".to_string(),
151        )
152        .with_source(e)
153    })?;
154
155    serde.try_into(partition_spec_id, partition_type, schema)
156}
157
158#[cfg(test)]
159mod tests {
160    use std::collections::HashMap;
161    use std::fs;
162    use std::sync::Arc;
163
164    use serde_json::Value;
165    use tempfile::TempDir;
166
167    use super::*;
168    use crate::io::FileIOBuilder;
169    use crate::spec::{Literal, NestedField, PrimitiveType, Struct, Transform, Type};
170
171    #[tokio::test]
172    async fn test_parse_manifest_v2_unpartition() {
173        let schema = Arc::new(
174            Schema::builder()
175                .with_fields(vec![
176                    // id v_int v_long v_float v_double v_varchar v_bool v_date v_timestamp v_decimal v_ts_ntz
177                    Arc::new(NestedField::optional(
178                        1,
179                        "id",
180                        Type::Primitive(PrimitiveType::Long),
181                    )),
182                    Arc::new(NestedField::optional(
183                        2,
184                        "v_int",
185                        Type::Primitive(PrimitiveType::Int),
186                    )),
187                    Arc::new(NestedField::optional(
188                        3,
189                        "v_long",
190                        Type::Primitive(PrimitiveType::Long),
191                    )),
192                    Arc::new(NestedField::optional(
193                        4,
194                        "v_float",
195                        Type::Primitive(PrimitiveType::Float),
196                    )),
197                    Arc::new(NestedField::optional(
198                        5,
199                        "v_double",
200                        Type::Primitive(PrimitiveType::Double),
201                    )),
202                    Arc::new(NestedField::optional(
203                        6,
204                        "v_varchar",
205                        Type::Primitive(PrimitiveType::String),
206                    )),
207                    Arc::new(NestedField::optional(
208                        7,
209                        "v_bool",
210                        Type::Primitive(PrimitiveType::Boolean),
211                    )),
212                    Arc::new(NestedField::optional(
213                        8,
214                        "v_date",
215                        Type::Primitive(PrimitiveType::Date),
216                    )),
217                    Arc::new(NestedField::optional(
218                        9,
219                        "v_timestamp",
220                        Type::Primitive(PrimitiveType::Timestamptz),
221                    )),
222                    Arc::new(NestedField::optional(
223                        10,
224                        "v_decimal",
225                        Type::Primitive(PrimitiveType::Decimal {
226                            precision: 36,
227                            scale: 10,
228                        }),
229                    )),
230                    Arc::new(NestedField::optional(
231                        11,
232                        "v_ts_ntz",
233                        Type::Primitive(PrimitiveType::Timestamp),
234                    )),
235                    Arc::new(NestedField::optional(
236                        12,
237                        "v_ts_ns_ntz",
238                        Type::Primitive(PrimitiveType::TimestampNs),
239                    )),
240                ])
241                .build()
242                .unwrap(),
243        );
244        let metadata = ManifestMetadata {
245            schema_id: 0,
246            schema: schema.clone(),
247            partition_spec: PartitionSpec::builder(schema)
248                .with_spec_id(0)
249                .build()
250                .unwrap(),
251            content: ManifestContentType::Data,
252            format_version: FormatVersion::V2,
253        };
254        let mut entries = vec![
255                ManifestEntry {
256                    status: ManifestStatus::Added,
257                    snapshot_id: None,
258                    sequence_number: None,
259                    file_sequence_number: None,
260                    data_file: DataFile {content:DataContentType::Data,file_path:"s3a://icebergdata/demo/s1/t1/data/00000-0-ba56fbfa-f2ff-40c9-bb27-565ad6dc2be8-00000.parquet".to_string(),file_format:DataFileFormat::Parquet,partition:Struct::empty(),record_count:1,file_size_in_bytes:5442,column_sizes:HashMap::from([(0,73),(6,34),(2,73),(7,61),(3,61),(5,62),(9,79),(10,73),(1,61),(4,73),(8,73)]),value_counts:HashMap::from([(4,1),(5,1),(2,1),(0,1),(3,1),(6,1),(8,1),(1,1),(10,1),(7,1),(9,1)]),null_value_counts:HashMap::from([(1,0),(6,0),(2,0),(8,0),(0,0),(3,0),(5,0),(9,0),(7,0),(4,0),(10,0)]),nan_value_counts:HashMap::new(),lower_bounds:HashMap::new(),upper_bounds:HashMap::new(),key_metadata:None,split_offsets:vec![4],equality_ids:Some(Vec::new()),sort_order_id:None, partition_spec_id: 0,first_row_id: None,referenced_data_file: None,content_offset: None,content_size_in_bytes: None }
261                }
262            ];
263
264        // write manifest to file
265        let tmp_dir = TempDir::new().unwrap();
266        let path = tmp_dir.path().join("test_manifest.avro");
267        let io = FileIOBuilder::new_fs_io().build().unwrap();
268        let output_file = io.new_output(path.to_str().unwrap()).unwrap();
269        let mut writer = ManifestWriterBuilder::new(
270            output_file,
271            Some(1),
272            None,
273            metadata.schema.clone(),
274            metadata.partition_spec.clone(),
275        )
276        .build_v2_data();
277        for entry in &entries {
278            writer.add_entry(entry.clone()).unwrap();
279        }
280        writer.write_manifest_file().await.unwrap();
281
282        // read back the manifest file and check the content
283        let actual_manifest =
284            Manifest::parse_avro(fs::read(path).expect("read_file must succeed").as_slice())
285                .unwrap();
286        // The snapshot id is assigned when the entry is added to the manifest.
287        entries[0].snapshot_id = Some(1);
288        assert_eq!(actual_manifest, Manifest::new(metadata, entries));
289    }
290
291    #[tokio::test]
292    async fn test_parse_manifest_v2_partition() {
293        let schema = Arc::new(
294            Schema::builder()
295                .with_fields(vec![
296                    Arc::new(NestedField::optional(
297                        1,
298                        "id",
299                        Type::Primitive(PrimitiveType::Long),
300                    )),
301                    Arc::new(NestedField::optional(
302                        2,
303                        "v_int",
304                        Type::Primitive(PrimitiveType::Int),
305                    )),
306                    Arc::new(NestedField::optional(
307                        3,
308                        "v_long",
309                        Type::Primitive(PrimitiveType::Long),
310                    )),
311                    Arc::new(NestedField::optional(
312                        4,
313                        "v_float",
314                        Type::Primitive(PrimitiveType::Float),
315                    )),
316                    Arc::new(NestedField::optional(
317                        5,
318                        "v_double",
319                        Type::Primitive(PrimitiveType::Double),
320                    )),
321                    Arc::new(NestedField::optional(
322                        6,
323                        "v_varchar",
324                        Type::Primitive(PrimitiveType::String),
325                    )),
326                    Arc::new(NestedField::optional(
327                        7,
328                        "v_bool",
329                        Type::Primitive(PrimitiveType::Boolean),
330                    )),
331                    Arc::new(NestedField::optional(
332                        8,
333                        "v_date",
334                        Type::Primitive(PrimitiveType::Date),
335                    )),
336                    Arc::new(NestedField::optional(
337                        9,
338                        "v_timestamp",
339                        Type::Primitive(PrimitiveType::Timestamptz),
340                    )),
341                    Arc::new(NestedField::optional(
342                        10,
343                        "v_decimal",
344                        Type::Primitive(PrimitiveType::Decimal {
345                            precision: 36,
346                            scale: 10,
347                        }),
348                    )),
349                    Arc::new(NestedField::optional(
350                        11,
351                        "v_ts_ntz",
352                        Type::Primitive(PrimitiveType::Timestamp),
353                    )),
354                    Arc::new(NestedField::optional(
355                        12,
356                        "v_ts_ns_ntz",
357                        Type::Primitive(PrimitiveType::TimestampNs),
358                    )),
359                ])
360                .build()
361                .unwrap(),
362        );
363        let metadata = ManifestMetadata {
364            schema_id: 0,
365            schema: schema.clone(),
366            partition_spec: PartitionSpec::builder(schema)
367                .with_spec_id(0)
368                .add_partition_field("v_int", "v_int", Transform::Identity)
369                .unwrap()
370                .add_partition_field("v_long", "v_long", Transform::Identity)
371                .unwrap()
372                .build()
373                .unwrap(),
374            content: ManifestContentType::Data,
375            format_version: FormatVersion::V2,
376        };
377        let mut entries = vec![ManifestEntry {
378                status: ManifestStatus::Added,
379                snapshot_id: None,
380                sequence_number: None,
381                file_sequence_number: None,
382                data_file: DataFile {
383                    content: DataContentType::Data,
384                    file_format: DataFileFormat::Parquet,
385                    file_path: "s3a://icebergdata/demo/s1/t1/data/00000-0-378b56f5-5c52-4102-a2c2-f05f8a7cbe4a-00000.parquet".to_string(),
386                    partition: Struct::from_iter(
387                        vec![
388                            Some(Literal::int(1)),
389                            Some(Literal::long(1000)),
390                        ]
391                            .into_iter()
392                    ),
393                    record_count: 1,
394                    file_size_in_bytes: 5442,
395                    column_sizes: HashMap::from([
396                        (0, 73),
397                        (6, 34),
398                        (2, 73),
399                        (7, 61),
400                        (3, 61),
401                        (5, 62),
402                        (9, 79),
403                        (10, 73),
404                        (1, 61),
405                        (4, 73),
406                        (8, 73)
407                    ]),
408                    value_counts: HashMap::from([
409                        (4, 1),
410                        (5, 1),
411                        (2, 1),
412                        (0, 1),
413                        (3, 1),
414                        (6, 1),
415                        (8, 1),
416                        (1, 1),
417                        (10, 1),
418                        (7, 1),
419                        (9, 1)
420                    ]),
421                    null_value_counts: HashMap::from([
422                        (1, 0),
423                        (6, 0),
424                        (2, 0),
425                        (8, 0),
426                        (0, 0),
427                        (3, 0),
428                        (5, 0),
429                        (9, 0),
430                        (7, 0),
431                        (4, 0),
432                        (10, 0)
433                    ]),
434                    nan_value_counts: HashMap::new(),
435                    lower_bounds: HashMap::new(),
436                    upper_bounds: HashMap::new(),
437                    key_metadata: None,
438                    split_offsets: vec![4],
439                    equality_ids: Some(Vec::new()),
440                    sort_order_id: None,
441                    partition_spec_id: 0,
442                    first_row_id: None,
443                    referenced_data_file: None,
444                    content_offset: None,
445                    content_size_in_bytes: None,
446                },
447            }];
448
449        // write manifest to file and check the return manifest file.
450        let tmp_dir = TempDir::new().unwrap();
451        let path = tmp_dir.path().join("test_manifest.avro");
452        let io = FileIOBuilder::new_fs_io().build().unwrap();
453        let output_file = io.new_output(path.to_str().unwrap()).unwrap();
454        let mut writer = ManifestWriterBuilder::new(
455            output_file,
456            Some(2),
457            None,
458            metadata.schema.clone(),
459            metadata.partition_spec.clone(),
460        )
461        .build_v2_data();
462        for entry in &entries {
463            writer.add_entry(entry.clone()).unwrap();
464        }
465        let manifest_file = writer.write_manifest_file().await.unwrap();
466        assert_eq!(manifest_file.sequence_number, UNASSIGNED_SEQUENCE_NUMBER);
467        assert_eq!(
468            manifest_file.min_sequence_number,
469            UNASSIGNED_SEQUENCE_NUMBER
470        );
471
472        // read back the manifest file and check the content
473        let actual_manifest =
474            Manifest::parse_avro(fs::read(path).expect("read_file must succeed").as_slice())
475                .unwrap();
476        // The snapshot id is assigned when the entry is added to the manifest.
477        entries[0].snapshot_id = Some(2);
478        assert_eq!(actual_manifest, Manifest::new(metadata, entries));
479    }
480
481    #[tokio::test]
482    async fn test_parse_manifest_v1_unpartition() {
483        let schema = Arc::new(
484            Schema::builder()
485                .with_schema_id(1)
486                .with_fields(vec![
487                    Arc::new(NestedField::optional(
488                        1,
489                        "id",
490                        Type::Primitive(PrimitiveType::Int),
491                    )),
492                    Arc::new(NestedField::optional(
493                        2,
494                        "data",
495                        Type::Primitive(PrimitiveType::String),
496                    )),
497                    Arc::new(NestedField::optional(
498                        3,
499                        "comment",
500                        Type::Primitive(PrimitiveType::String),
501                    )),
502                ])
503                .build()
504                .unwrap(),
505        );
506        let metadata = ManifestMetadata {
507            schema_id: 1,
508            schema: schema.clone(),
509            partition_spec: PartitionSpec::builder(schema)
510                .with_spec_id(0)
511                .build()
512                .unwrap(),
513            content: ManifestContentType::Data,
514            format_version: FormatVersion::V1,
515        };
516        let mut entries = vec![ManifestEntry {
517                status: ManifestStatus::Added,
518                snapshot_id: Some(0),
519                sequence_number: Some(0),
520                file_sequence_number: Some(0),
521                data_file: DataFile {
522                    content: DataContentType::Data,
523                    file_path: "s3://testbucket/iceberg_data/iceberg_ctl/iceberg_db/iceberg_tbl/data/00000-7-45268d71-54eb-476c-b42c-942d880c04a1-00001.parquet".to_string(),
524                    file_format: DataFileFormat::Parquet,
525                    partition: Struct::empty(),
526                    record_count: 1,
527                    file_size_in_bytes: 875,
528                    column_sizes: HashMap::from([(1,47),(2,48),(3,52)]),
529                    value_counts: HashMap::from([(1,1),(2,1),(3,1)]),
530                    null_value_counts: HashMap::from([(1,0),(2,0),(3,0)]),
531                    nan_value_counts: HashMap::new(),
532                    lower_bounds: HashMap::from([(1,Datum::int(1)),(2,Datum::string("a")),(3,Datum::string("AC/DC"))]),
533                    upper_bounds: HashMap::from([(1,Datum::int(1)),(2,Datum::string("a")),(3,Datum::string("AC/DC"))]),
534                    key_metadata: None,
535                    split_offsets: vec![4],
536                    equality_ids: None,
537                    sort_order_id: Some(0),
538                    partition_spec_id: 0,
539                    first_row_id: None,
540                    referenced_data_file: None,
541                    content_offset: None,
542                    content_size_in_bytes: None,
543                }
544            }];
545
546        // write manifest to file
547        let tmp_dir = TempDir::new().unwrap();
548        let path = tmp_dir.path().join("test_manifest.avro");
549        let io = FileIOBuilder::new_fs_io().build().unwrap();
550        let output_file = io.new_output(path.to_str().unwrap()).unwrap();
551        let mut writer = ManifestWriterBuilder::new(
552            output_file,
553            Some(3),
554            None,
555            metadata.schema.clone(),
556            metadata.partition_spec.clone(),
557        )
558        .build_v1();
559        for entry in &entries {
560            writer.add_entry(entry.clone()).unwrap();
561        }
562        writer.write_manifest_file().await.unwrap();
563
564        // read back the manifest file and check the content
565        let actual_manifest =
566            Manifest::parse_avro(fs::read(path).expect("read_file must succeed").as_slice())
567                .unwrap();
568        // The snapshot id is assigned when the entry is added to the manifest.
569        entries[0].snapshot_id = Some(3);
570        assert_eq!(actual_manifest, Manifest::new(metadata, entries));
571    }
572
573    #[tokio::test]
574    async fn test_parse_manifest_v1_partition() {
575        let schema = Arc::new(
576            Schema::builder()
577                .with_fields(vec![
578                    Arc::new(NestedField::optional(
579                        1,
580                        "id",
581                        Type::Primitive(PrimitiveType::Long),
582                    )),
583                    Arc::new(NestedField::optional(
584                        2,
585                        "data",
586                        Type::Primitive(PrimitiveType::String),
587                    )),
588                    Arc::new(NestedField::optional(
589                        3,
590                        "category",
591                        Type::Primitive(PrimitiveType::String),
592                    )),
593                ])
594                .build()
595                .unwrap(),
596        );
597        let metadata = ManifestMetadata {
598            schema_id: 0,
599            schema: schema.clone(),
600            partition_spec: PartitionSpec::builder(schema)
601                .add_partition_field("category", "category", Transform::Identity)
602                .unwrap()
603                .build()
604                .unwrap(),
605            content: ManifestContentType::Data,
606            format_version: FormatVersion::V1,
607        };
608        let mut entries = vec![
609                ManifestEntry {
610                    status: ManifestStatus::Added,
611                    snapshot_id: Some(0),
612                    sequence_number: Some(0),
613                    file_sequence_number: Some(0),
614                    data_file: DataFile {
615                        content: DataContentType::Data,
616                        file_path: "s3://testbucket/prod/db/sample/data/category=x/00010-1-d5c93668-1e52-41ac-92a6-bba590cbf249-00001.parquet".to_string(),
617                        file_format: DataFileFormat::Parquet,
618                        partition: Struct::from_iter(
619                            vec![
620                                Some(
621                                    Literal::string("x"),
622                                ),
623                            ]
624                                .into_iter()
625                        ),
626                        record_count: 1,
627                        file_size_in_bytes: 874,
628                        column_sizes: HashMap::from([(1, 46), (2, 48), (3, 48)]),
629                        value_counts: HashMap::from([(1, 1), (2, 1), (3, 1)]),
630                        null_value_counts: HashMap::from([(1, 0), (2, 0), (3, 0)]),
631                        nan_value_counts: HashMap::new(),
632                        lower_bounds: HashMap::from([
633                        (1, Datum::long(1)),
634                        (2, Datum::string("a")),
635                        (3, Datum::string("x"))
636                        ]),
637                        upper_bounds: HashMap::from([
638                        (1, Datum::long(1)),
639                        (2, Datum::string("a")),
640                        (3, Datum::string("x"))
641                        ]),
642                        key_metadata: None,
643                        split_offsets: vec![4],
644                        equality_ids: None,
645                        sort_order_id: Some(0),
646                        partition_spec_id: 0,
647                        first_row_id: None,
648                        referenced_data_file: None,
649                        content_offset: None,
650                        content_size_in_bytes: None,
651                    },
652                }
653            ];
654
655        // write manifest to file
656        let tmp_dir = TempDir::new().unwrap();
657        let path = tmp_dir.path().join("test_manifest.avro");
658        let io = FileIOBuilder::new_fs_io().build().unwrap();
659        let output_file = io.new_output(path.to_str().unwrap()).unwrap();
660        let mut writer = ManifestWriterBuilder::new(
661            output_file,
662            Some(2),
663            None,
664            metadata.schema.clone(),
665            metadata.partition_spec.clone(),
666        )
667        .build_v1();
668        for entry in &entries {
669            writer.add_entry(entry.clone()).unwrap();
670        }
671        let manifest_file = writer.write_manifest_file().await.unwrap();
672        let partitions = manifest_file.partitions.unwrap();
673        assert_eq!(partitions.len(), 1);
674        assert_eq!(
675            partitions[0].clone().lower_bound.unwrap(),
676            Datum::string("x").to_bytes().unwrap()
677        );
678        assert_eq!(
679            partitions[0].clone().upper_bound.unwrap(),
680            Datum::string("x").to_bytes().unwrap()
681        );
682
683        // read back the manifest file and check the content
684        let actual_manifest =
685            Manifest::parse_avro(fs::read(path).expect("read_file must succeed").as_slice())
686                .unwrap();
687        // The snapshot id is assigned when the entry is added to the manifest.
688        entries[0].snapshot_id = Some(2);
689        assert_eq!(actual_manifest, Manifest::new(metadata, entries));
690    }
691
692    #[tokio::test]
693    async fn test_parse_manifest_with_schema_evolution() {
694        let schema = Arc::new(
695            Schema::builder()
696                .with_fields(vec![
697                    Arc::new(NestedField::optional(
698                        1,
699                        "id",
700                        Type::Primitive(PrimitiveType::Long),
701                    )),
702                    Arc::new(NestedField::optional(
703                        2,
704                        "v_int",
705                        Type::Primitive(PrimitiveType::Int),
706                    )),
707                ])
708                .build()
709                .unwrap(),
710        );
711        let metadata = ManifestMetadata {
712            schema_id: 0,
713            schema: schema.clone(),
714            partition_spec: PartitionSpec::builder(schema)
715                .with_spec_id(0)
716                .build()
717                .unwrap(),
718            content: ManifestContentType::Data,
719            format_version: FormatVersion::V2,
720        };
721        let entries = vec![ManifestEntry {
722                status: ManifestStatus::Added,
723                snapshot_id: None,
724                sequence_number: None,
725                file_sequence_number: None,
726                data_file: DataFile {
727                    content: DataContentType::Data,
728                    file_format: DataFileFormat::Parquet,
729                    file_path: "s3a://icebergdata/demo/s1/t1/data/00000-0-378b56f5-5c52-4102-a2c2-f05f8a7cbe4a-00000.parquet".to_string(),
730                    partition: Struct::empty(),
731                    record_count: 1,
732                    file_size_in_bytes: 5442,
733                    column_sizes: HashMap::from([
734                        (1, 61),
735                        (2, 73),
736                        (3, 61),
737                    ]),
738                    value_counts: HashMap::default(),
739                    null_value_counts: HashMap::default(),
740                    nan_value_counts: HashMap::new(),
741                    lower_bounds: HashMap::from([
742                        (1, Datum::long(1)),
743                        (2, Datum::int(2)),
744                        (3, Datum::string("x"))
745                    ]),
746                    upper_bounds: HashMap::from([
747                        (1, Datum::long(1)),
748                        (2, Datum::int(2)),
749                        (3, Datum::string("x"))
750                    ]),
751                    key_metadata: None,
752                    split_offsets: vec![4],
753                    equality_ids: None,
754                    sort_order_id: None,
755                    partition_spec_id: 0,
756                    first_row_id: None,
757                    referenced_data_file: None,
758                    content_offset: None,
759                    content_size_in_bytes: None,
760                },
761            }];
762
763        // write manifest to file
764        let tmp_dir = TempDir::new().unwrap();
765        let path = tmp_dir.path().join("test_manifest.avro");
766        let io = FileIOBuilder::new_fs_io().build().unwrap();
767        let output_file = io.new_output(path.to_str().unwrap()).unwrap();
768        let mut writer = ManifestWriterBuilder::new(
769            output_file,
770            Some(2),
771            None,
772            metadata.schema.clone(),
773            metadata.partition_spec.clone(),
774        )
775        .build_v2_data();
776        for entry in &entries {
777            writer.add_entry(entry.clone()).unwrap();
778        }
779        writer.write_manifest_file().await.unwrap();
780
781        // read back the manifest file and check the content
782        let actual_manifest =
783            Manifest::parse_avro(fs::read(path).expect("read_file must succeed").as_slice())
784                .unwrap();
785
786        // Compared with original manifest, the lower_bounds and upper_bounds no longer has data for field 3, and
787        // other parts should be same.
788        // The snapshot id is assigned when the entry is added to the manifest.
789        let schema = Arc::new(
790            Schema::builder()
791                .with_fields(vec![
792                    Arc::new(NestedField::optional(
793                        1,
794                        "id",
795                        Type::Primitive(PrimitiveType::Long),
796                    )),
797                    Arc::new(NestedField::optional(
798                        2,
799                        "v_int",
800                        Type::Primitive(PrimitiveType::Int),
801                    )),
802                ])
803                .build()
804                .unwrap(),
805        );
806        let expected_manifest = Manifest {
807            metadata: ManifestMetadata {
808                schema_id: 0,
809                schema: schema.clone(),
810                partition_spec: PartitionSpec::builder(schema).with_spec_id(0).build().unwrap(),
811                content: ManifestContentType::Data,
812                format_version: FormatVersion::V2,
813            },
814            entries: vec![Arc::new(ManifestEntry {
815                status: ManifestStatus::Added,
816                snapshot_id: Some(2),
817                sequence_number: None,
818                file_sequence_number: None,
819                data_file: DataFile {
820                    content: DataContentType::Data,
821                    file_format: DataFileFormat::Parquet,
822                    file_path: "s3a://icebergdata/demo/s1/t1/data/00000-0-378b56f5-5c52-4102-a2c2-f05f8a7cbe4a-00000.parquet".to_string(),
823                    partition: Struct::empty(),
824                    record_count: 1,
825                    file_size_in_bytes: 5442,
826                    column_sizes: HashMap::from([
827                        (1, 61),
828                        (2, 73),
829                        (3, 61),
830                    ]),
831                    value_counts: HashMap::default(),
832                    null_value_counts: HashMap::default(),
833                    nan_value_counts: HashMap::new(),
834                    lower_bounds: HashMap::from([
835                        (1, Datum::long(1)),
836                        (2, Datum::int(2)),
837                    ]),
838                    upper_bounds: HashMap::from([
839                        (1, Datum::long(1)),
840                        (2, Datum::int(2)),
841                    ]),
842                    key_metadata: None,
843                    split_offsets: vec![4],
844                    equality_ids: None,
845                    sort_order_id: None,
846                    partition_spec_id: 0,
847                    first_row_id: None,
848                    referenced_data_file: None,
849                    content_offset: None,
850                    content_size_in_bytes: None,
851                },
852            })],
853        };
854
855        assert_eq!(actual_manifest, expected_manifest);
856    }
857
858    #[tokio::test]
859    async fn test_manifest_summary() {
860        let schema = Arc::new(
861            Schema::builder()
862                .with_fields(vec![
863                    Arc::new(NestedField::optional(
864                        1,
865                        "time",
866                        Type::Primitive(PrimitiveType::Date),
867                    )),
868                    Arc::new(NestedField::optional(
869                        2,
870                        "v_float",
871                        Type::Primitive(PrimitiveType::Float),
872                    )),
873                    Arc::new(NestedField::optional(
874                        3,
875                        "v_double",
876                        Type::Primitive(PrimitiveType::Double),
877                    )),
878                ])
879                .build()
880                .unwrap(),
881        );
882        let partition_spec = PartitionSpec::builder(schema.clone())
883            .with_spec_id(0)
884            .add_partition_field("time", "year_of_time", Transform::Year)
885            .unwrap()
886            .add_partition_field("v_float", "f", Transform::Identity)
887            .unwrap()
888            .add_partition_field("v_double", "d", Transform::Identity)
889            .unwrap()
890            .build()
891            .unwrap();
892        let metadata = ManifestMetadata {
893            schema_id: 0,
894            schema,
895            partition_spec,
896            content: ManifestContentType::Data,
897            format_version: FormatVersion::V2,
898        };
899        let entries = vec![
900            ManifestEntry {
901                status: ManifestStatus::Added,
902                snapshot_id: None,
903                sequence_number: None,
904                file_sequence_number: None,
905                data_file: DataFile {
906                    content: DataContentType::Data,
907                    file_path: "s3a://icebergdata/demo/s1/t1/data/00000-0-ba56fbfa-f2ff-40c9-bb27-565ad6dc2be8-00000.parquet".to_string(),
908                    file_format: DataFileFormat::Parquet,
909                    partition: Struct::from_iter(
910                        vec![
911                            Some(Literal::int(2021)),
912                            Some(Literal::float(1.0)),
913                            Some(Literal::double(2.0)),
914                        ]
915                    ),
916                    record_count: 1,
917                    file_size_in_bytes: 5442,
918                    column_sizes: HashMap::from([(0,73),(6,34),(2,73),(7,61),(3,61),(5,62),(9,79),(10,73),(1,61),(4,73),(8,73)]),
919                    value_counts: HashMap::from([(4,1),(5,1),(2,1),(0,1),(3,1),(6,1),(8,1),(1,1),(10,1),(7,1),(9,1)]),
920                    null_value_counts: HashMap::from([(1,0),(6,0),(2,0),(8,0),(0,0),(3,0),(5,0),(9,0),(7,0),(4,0),(10,0)]),
921                    nan_value_counts: HashMap::new(),
922                    lower_bounds: HashMap::new(),
923                    upper_bounds: HashMap::new(),
924                    key_metadata: None,
925                    split_offsets: vec![4],
926                    equality_ids: None,
927                    sort_order_id: None,
928                    partition_spec_id: 0,
929                    first_row_id: None,
930                    referenced_data_file: None,
931                    content_offset: None,
932                    content_size_in_bytes: None,
933                }
934            },
935                ManifestEntry {
936                    status: ManifestStatus::Added,
937                    snapshot_id: None,
938                    sequence_number: None,
939                    file_sequence_number: None,
940                    data_file: DataFile {
941                        content: DataContentType::Data,
942                        file_path: "s3a://icebergdata/demo/s1/t1/data/00000-0-ba56fbfa-f2ff-40c9-bb27-565ad6dc2be8-00000.parquet".to_string(),
943                        file_format: DataFileFormat::Parquet,
944                        partition: Struct::from_iter(
945                            vec![
946                                Some(Literal::int(1111)),
947                                Some(Literal::float(15.5)),
948                                Some(Literal::double(25.5)),
949                            ]
950                        ),
951                        record_count: 1,
952                        file_size_in_bytes: 5442,
953                        column_sizes: HashMap::from([(0,73),(6,34),(2,73),(7,61),(3,61),(5,62),(9,79),(10,73),(1,61),(4,73),(8,73)]),
954                        value_counts: HashMap::from([(4,1),(5,1),(2,1),(0,1),(3,1),(6,1),(8,1),(1,1),(10,1),(7,1),(9,1)]),
955                        null_value_counts: HashMap::from([(1,0),(6,0),(2,0),(8,0),(0,0),(3,0),(5,0),(9,0),(7,0),(4,0),(10,0)]),
956                        nan_value_counts: HashMap::new(),
957                        lower_bounds: HashMap::new(),
958                        upper_bounds: HashMap::new(),
959                        key_metadata: None,
960                        split_offsets: vec![4],
961                        equality_ids: None,
962                        sort_order_id: None,
963                        partition_spec_id: 0,
964                        first_row_id: None,
965                        referenced_data_file: None,
966                        content_offset: None,
967                        content_size_in_bytes: None,
968                    }
969                },
970                ManifestEntry {
971                    status: ManifestStatus::Added,
972                    snapshot_id: None,
973                    sequence_number: None,
974                    file_sequence_number: None,
975                    data_file: DataFile {
976                        content: DataContentType::Data,
977                        file_path: "s3a://icebergdata/demo/s1/t1/data/00000-0-ba56fbfa-f2ff-40c9-bb27-565ad6dc2be8-00000.parquet".to_string(),
978                        file_format: DataFileFormat::Parquet,
979                        partition: Struct::from_iter(
980                            vec![
981                                Some(Literal::int(1211)),
982                                Some(Literal::float(f32::NAN)),
983                                Some(Literal::double(1.0)),
984                            ]
985                        ),
986                        record_count: 1,
987                        file_size_in_bytes: 5442,
988                        column_sizes: HashMap::from([(0,73),(6,34),(2,73),(7,61),(3,61),(5,62),(9,79),(10,73),(1,61),(4,73),(8,73)]),
989                        value_counts: HashMap::from([(4,1),(5,1),(2,1),(0,1),(3,1),(6,1),(8,1),(1,1),(10,1),(7,1),(9,1)]),
990                        null_value_counts: HashMap::from([(1,0),(6,0),(2,0),(8,0),(0,0),(3,0),(5,0),(9,0),(7,0),(4,0),(10,0)]),
991                        nan_value_counts: HashMap::new(),
992                        lower_bounds: HashMap::new(),
993                        upper_bounds: HashMap::new(),
994                        key_metadata: None,
995                        split_offsets: vec![4],
996                        equality_ids: None,
997                        sort_order_id: None,
998                        partition_spec_id: 0,
999                        first_row_id: None,
1000                        referenced_data_file: None,
1001                        content_offset: None,
1002                        content_size_in_bytes: None,
1003                    }
1004                },
1005                ManifestEntry {
1006                    status: ManifestStatus::Added,
1007                    snapshot_id: None,
1008                    sequence_number: None,
1009                    file_sequence_number: None,
1010                    data_file: DataFile {
1011                        content: DataContentType::Data,
1012                        file_path: "s3a://icebergdata/demo/s1/t1/data/00000-0-ba56fbfa-f2ff-40c9-bb27-565ad6dc2be8-00000.parquet".to_string(),
1013                        file_format: DataFileFormat::Parquet,
1014                        partition: Struct::from_iter(
1015                            vec![
1016                                Some(Literal::int(1111)),
1017                                None,
1018                                Some(Literal::double(11.0)),
1019                            ]
1020                        ),
1021                        record_count: 1,
1022                        file_size_in_bytes: 5442,
1023                        column_sizes: HashMap::from([(0,73),(6,34),(2,73),(7,61),(3,61),(5,62),(9,79),(10,73),(1,61),(4,73),(8,73)]),
1024                        value_counts: HashMap::from([(4,1),(5,1),(2,1),(0,1),(3,1),(6,1),(8,1),(1,1),(10,1),(7,1),(9,1)]),
1025                        null_value_counts: HashMap::from([(1,0),(6,0),(2,0),(8,0),(0,0),(3,0),(5,0),(9,0),(7,0),(4,0),(10,0)]),
1026                        nan_value_counts: HashMap::new(),
1027                        lower_bounds: HashMap::new(),
1028                        upper_bounds: HashMap::new(),
1029                        key_metadata: None,
1030                        split_offsets: vec![4],
1031                        equality_ids: None,
1032                        sort_order_id: None,
1033                        partition_spec_id: 0,
1034                        first_row_id: None,
1035                        referenced_data_file: None,
1036                        content_offset: None,
1037                        content_size_in_bytes: None,
1038                    }
1039                },
1040        ];
1041
1042        // write manifest to file
1043        let tmp_dir = TempDir::new().unwrap();
1044        let path = tmp_dir.path().join("test_manifest.avro");
1045        let io = FileIOBuilder::new_fs_io().build().unwrap();
1046        let output_file = io.new_output(path.to_str().unwrap()).unwrap();
1047        let mut writer = ManifestWriterBuilder::new(
1048            output_file,
1049            Some(1),
1050            None,
1051            metadata.schema.clone(),
1052            metadata.partition_spec.clone(),
1053        )
1054        .build_v2_data();
1055        for entry in &entries {
1056            writer.add_entry(entry.clone()).unwrap();
1057        }
1058        let res = writer.write_manifest_file().await.unwrap();
1059
1060        let partitions = res.partitions.unwrap();
1061
1062        assert_eq!(partitions.len(), 3);
1063        assert_eq!(
1064            partitions[0].clone().lower_bound.unwrap(),
1065            Datum::int(1111).to_bytes().unwrap()
1066        );
1067        assert_eq!(
1068            partitions[0].clone().upper_bound.unwrap(),
1069            Datum::int(2021).to_bytes().unwrap()
1070        );
1071        assert!(!partitions[0].clone().contains_null);
1072        assert_eq!(partitions[0].clone().contains_nan, Some(false));
1073
1074        assert_eq!(
1075            partitions[1].clone().lower_bound.unwrap(),
1076            Datum::float(1.0).to_bytes().unwrap()
1077        );
1078        assert_eq!(
1079            partitions[1].clone().upper_bound.unwrap(),
1080            Datum::float(15.5).to_bytes().unwrap()
1081        );
1082        assert!(partitions[1].clone().contains_null);
1083        assert_eq!(partitions[1].clone().contains_nan, Some(true));
1084
1085        assert_eq!(
1086            partitions[2].clone().lower_bound.unwrap(),
1087            Datum::double(1.0).to_bytes().unwrap()
1088        );
1089        assert_eq!(
1090            partitions[2].clone().upper_bound.unwrap(),
1091            Datum::double(25.5).to_bytes().unwrap()
1092        );
1093        assert!(!partitions[2].clone().contains_null);
1094        assert_eq!(partitions[2].clone().contains_nan, Some(false));
1095    }
1096
1097    #[test]
1098    fn test_data_file_serialization() {
1099        // Create a simple schema
1100        let schema = Schema::builder()
1101            .with_schema_id(1)
1102            .with_identifier_field_ids(vec![1])
1103            .with_fields(vec![
1104                NestedField::required(1, "id", Type::Primitive(PrimitiveType::Long)).into(),
1105                NestedField::required(2, "name", Type::Primitive(PrimitiveType::String)).into(),
1106            ])
1107            .build()
1108            .unwrap();
1109
1110        // Create a partition spec
1111        let partition_spec = PartitionSpec::builder(schema.clone())
1112            .with_spec_id(1)
1113            .add_partition_field("id", "id_partition", Transform::Identity)
1114            .unwrap()
1115            .build()
1116            .unwrap();
1117
1118        // Get partition type from the partition spec
1119        let partition_type = partition_spec.partition_type(&schema).unwrap();
1120
1121        // Create a vector of DataFile objects
1122        let data_files = vec![
1123            DataFileBuilder::default()
1124                .content(DataContentType::Data)
1125                .file_format(DataFileFormat::Parquet)
1126                .file_path("path/to/file1.parquet".to_string())
1127                .file_size_in_bytes(1024)
1128                .record_count(100)
1129                .partition_spec_id(1)
1130                .partition(Struct::empty())
1131                .column_sizes(HashMap::from([(1, 512), (2, 1024)]))
1132                .value_counts(HashMap::from([(1, 100), (2, 500)]))
1133                .null_value_counts(HashMap::from([(1, 0), (2, 1)]))
1134                .build()
1135                .unwrap(),
1136            DataFileBuilder::default()
1137                .content(DataContentType::Data)
1138                .file_format(DataFileFormat::Parquet)
1139                .file_path("path/to/file2.parquet".to_string())
1140                .file_size_in_bytes(2048)
1141                .record_count(200)
1142                .partition_spec_id(1)
1143                .partition(Struct::empty())
1144                .column_sizes(HashMap::from([(1, 1024), (2, 2048)]))
1145                .value_counts(HashMap::from([(1, 200), (2, 600)]))
1146                .null_value_counts(HashMap::from([(1, 10), (2, 999)]))
1147                .build()
1148                .unwrap(),
1149        ];
1150
1151        // Serialize the DataFile objects
1152        let serialized_files = data_files
1153            .clone()
1154            .into_iter()
1155            .map(|f| serialize_data_file_to_json(f, &partition_type, FormatVersion::V2).unwrap())
1156            .collect::<Vec<String>>();
1157
1158        // Verify we have the expected serialized files
1159        assert_eq!(serialized_files.len(), 2);
1160        let pretty_json1: Value = serde_json::from_str(serialized_files.first().unwrap()).unwrap();
1161        let pretty_json2: Value = serde_json::from_str(serialized_files.get(1).unwrap()).unwrap();
1162        let expected_serialized_file1 = serde_json::json!({
1163            "content": 0,
1164            "file_path": "path/to/file1.parquet",
1165            "file_format": "PARQUET",
1166            "partition": {},
1167            "record_count": 100,
1168            "file_size_in_bytes": 1024,
1169            "column_sizes": [
1170                { "key": 1, "value": 512 },
1171                { "key": 2, "value": 1024 }
1172            ],
1173            "value_counts": [
1174                { "key": 1, "value": 100 },
1175                { "key": 2, "value": 500 }
1176            ],
1177            "null_value_counts": [
1178                { "key": 1, "value": 0 },
1179                { "key": 2, "value": 1 }
1180            ],
1181            "nan_value_counts": [],
1182            "lower_bounds": [],
1183            "upper_bounds": [],
1184            "key_metadata": null,
1185            "split_offsets": [],
1186            "equality_ids": null,
1187            "sort_order_id": null,
1188            "first_row_id": null,
1189            "referenced_data_file": null,
1190            "content_offset": null,
1191            "content_size_in_bytes": null
1192        });
1193        let expected_serialized_file2 = serde_json::json!({
1194            "content": 0,
1195            "file_path": "path/to/file2.parquet",
1196            "file_format": "PARQUET",
1197            "partition": {},
1198            "record_count": 200,
1199            "file_size_in_bytes": 2048,
1200            "column_sizes": [
1201                { "key": 1, "value": 1024 },
1202                { "key": 2, "value": 2048 }
1203            ],
1204            "value_counts": [
1205                { "key": 1, "value": 200 },
1206                { "key": 2, "value": 600 }
1207            ],
1208            "null_value_counts": [
1209                { "key": 1, "value": 10 },
1210                { "key": 2, "value": 999 }
1211            ],
1212            "nan_value_counts": [],
1213            "lower_bounds": [],
1214            "upper_bounds": [],
1215            "key_metadata": null,
1216            "split_offsets": [],
1217            "equality_ids": null,
1218            "sort_order_id": null,
1219            "first_row_id": null,
1220            "referenced_data_file": null,
1221            "content_offset": null,
1222            "content_size_in_bytes": null
1223        });
1224        assert_eq!(pretty_json1, expected_serialized_file1);
1225        assert_eq!(pretty_json2, expected_serialized_file2);
1226
1227        // Now deserialize the JSON strings back into DataFile objects
1228        let deserialized_files: Vec<DataFile> = serialized_files
1229            .into_iter()
1230            .map(|json| {
1231                deserialize_data_file_from_json(
1232                    &json,
1233                    partition_spec.spec_id(),
1234                    &partition_type,
1235                    &schema,
1236                )
1237                .unwrap()
1238            })
1239            .collect();
1240
1241        // Verify we have the expected number of deserialized files
1242        assert_eq!(deserialized_files.len(), 2);
1243        let deserialized_data_file1 = deserialized_files.first().unwrap();
1244        let deserialized_data_file2 = deserialized_files.get(1).unwrap();
1245        let original_data_file1 = data_files.first().unwrap();
1246        let original_data_file2 = data_files.get(1).unwrap();
1247
1248        assert_eq!(deserialized_data_file1, original_data_file1);
1249        assert_eq!(deserialized_data_file2, original_data_file2);
1250    }
1251}