iceberg/spec/manifest/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18mod _serde;
19
20mod data_file;
21pub use data_file::*;
22mod entry;
23pub use entry::*;
24mod metadata;
25pub use metadata::*;
26mod writer;
27use std::sync::Arc;
28
29use apache_avro::{Reader as AvroReader, from_value};
30pub use writer::*;
31
32use super::{
33    Datum, FormatVersion, ManifestContentType, PartitionSpec, PrimitiveType, Schema, Struct,
34    UNASSIGNED_SEQUENCE_NUMBER,
35};
36use crate::error::Result;
37use crate::{Error, ErrorKind};
38
39/// A manifest contains metadata and a list of entries.
40#[derive(Debug, PartialEq, Eq, Clone)]
41pub struct Manifest {
42    metadata: ManifestMetadata,
43    entries: Vec<ManifestEntryRef>,
44}
45
46impl Manifest {
47    /// Parse manifest metadata and entries from bytes of avro file.
48    pub(crate) fn try_from_avro_bytes(bs: &[u8]) -> Result<(ManifestMetadata, Vec<ManifestEntry>)> {
49        let reader = AvroReader::new(bs)?;
50
51        // Parse manifest metadata
52        let meta = reader.user_metadata();
53        let metadata = ManifestMetadata::parse(meta)?;
54
55        // Parse manifest entries
56        let partition_type = metadata.partition_spec.partition_type(&metadata.schema)?;
57
58        let entries = match metadata.format_version {
59            FormatVersion::V1 => {
60                let schema = manifest_schema_v1(&partition_type)?;
61                let reader = AvroReader::with_schema(&schema, bs)?;
62                reader
63                    .into_iter()
64                    .map(|value| {
65                        from_value::<_serde::ManifestEntryV1>(&value?)?.try_into(
66                            metadata.partition_spec.spec_id(),
67                            &partition_type,
68                            &metadata.schema,
69                        )
70                    })
71                    .collect::<Result<Vec<_>>>()?
72            }
73            // Manifest Schema & Manifest Entry did not change between V2 and V3
74            FormatVersion::V2 | FormatVersion::V3 => {
75                let schema = manifest_schema_v2(&partition_type)?;
76                let reader = AvroReader::with_schema(&schema, bs)?;
77                reader
78                    .into_iter()
79                    .map(|value| {
80                        from_value::<_serde::ManifestEntryV2>(&value?)?.try_into(
81                            metadata.partition_spec.spec_id(),
82                            &partition_type,
83                            &metadata.schema,
84                        )
85                    })
86                    .collect::<Result<Vec<_>>>()?
87            }
88        };
89
90        Ok((metadata, entries))
91    }
92
93    /// Parse manifest from bytes of avro file.
94    pub fn parse_avro(bs: &[u8]) -> Result<Self> {
95        let (metadata, entries) = Self::try_from_avro_bytes(bs)?;
96        Ok(Self::new(metadata, entries))
97    }
98
99    /// Entries slice.
100    pub fn entries(&self) -> &[ManifestEntryRef] {
101        &self.entries
102    }
103
104    /// Get metadata.
105    pub fn metadata(&self) -> &ManifestMetadata {
106        &self.metadata
107    }
108
109    /// Consume this Manifest, returning its constituent parts
110    pub fn into_parts(self) -> (Vec<ManifestEntryRef>, ManifestMetadata) {
111        let Self { entries, metadata } = self;
112        (entries, metadata)
113    }
114
115    /// Constructor from [`ManifestMetadata`] and [`ManifestEntry`]s.
116    pub fn new(metadata: ManifestMetadata, entries: Vec<ManifestEntry>) -> Self {
117        Self {
118            metadata,
119            entries: entries.into_iter().map(Arc::new).collect(),
120        }
121    }
122}
123
124/// Serialize a DataFile to a JSON string.
125pub fn serialize_data_file_to_json(
126    data_file: DataFile,
127    partition_type: &super::StructType,
128    format_version: FormatVersion,
129) -> Result<String> {
130    let serde = _serde::DataFileSerde::try_from(data_file, partition_type, format_version)?;
131    serde_json::to_string(&serde).map_err(|e| {
132        Error::new(
133            ErrorKind::DataInvalid,
134            "Failed to serialize DataFile to JSON!".to_string(),
135        )
136        .with_source(e)
137    })
138}
139
140/// Deserialize a DataFile from a JSON string.
141pub fn deserialize_data_file_from_json(
142    json: &str,
143    partition_spec_id: i32,
144    partition_type: &super::StructType,
145    schema: &Schema,
146) -> Result<DataFile> {
147    let serde = serde_json::from_str::<_serde::DataFileSerde>(json).map_err(|e| {
148        Error::new(
149            ErrorKind::DataInvalid,
150            "Failed to deserialize JSON to DataFile!".to_string(),
151        )
152        .with_source(e)
153    })?;
154
155    serde.try_into(partition_spec_id, partition_type, schema)
156}
157
158#[cfg(test)]
159mod tests {
160    use std::collections::HashMap;
161    use std::fs;
162    use std::sync::Arc;
163
164    use apache_avro::{Codec, Writer, to_value};
165    use serde_json::{Value, to_vec};
166    use tempfile::TempDir;
167
168    use super::*;
169    use crate::io::FileIO;
170    use crate::spec::{Literal, NestedField, PrimitiveType, Struct, Transform, Type};
171
172    #[tokio::test]
173    async fn test_parse_manifest_v2_unpartition() {
174        let schema = Arc::new(
175            Schema::builder()
176                .with_fields(vec![
177                    // id v_int v_long v_float v_double v_varchar v_bool v_date v_timestamp v_decimal v_ts_ntz
178                    Arc::new(NestedField::optional(
179                        1,
180                        "id",
181                        Type::Primitive(PrimitiveType::Long),
182                    )),
183                    Arc::new(NestedField::optional(
184                        2,
185                        "v_int",
186                        Type::Primitive(PrimitiveType::Int),
187                    )),
188                    Arc::new(NestedField::optional(
189                        3,
190                        "v_long",
191                        Type::Primitive(PrimitiveType::Long),
192                    )),
193                    Arc::new(NestedField::optional(
194                        4,
195                        "v_float",
196                        Type::Primitive(PrimitiveType::Float),
197                    )),
198                    Arc::new(NestedField::optional(
199                        5,
200                        "v_double",
201                        Type::Primitive(PrimitiveType::Double),
202                    )),
203                    Arc::new(NestedField::optional(
204                        6,
205                        "v_varchar",
206                        Type::Primitive(PrimitiveType::String),
207                    )),
208                    Arc::new(NestedField::optional(
209                        7,
210                        "v_bool",
211                        Type::Primitive(PrimitiveType::Boolean),
212                    )),
213                    Arc::new(NestedField::optional(
214                        8,
215                        "v_date",
216                        Type::Primitive(PrimitiveType::Date),
217                    )),
218                    Arc::new(NestedField::optional(
219                        9,
220                        "v_timestamp",
221                        Type::Primitive(PrimitiveType::Timestamptz),
222                    )),
223                    Arc::new(NestedField::optional(
224                        10,
225                        "v_decimal",
226                        Type::Primitive(PrimitiveType::Decimal {
227                            precision: 36,
228                            scale: 10,
229                        }),
230                    )),
231                    Arc::new(NestedField::optional(
232                        11,
233                        "v_ts_ntz",
234                        Type::Primitive(PrimitiveType::Timestamp),
235                    )),
236                    Arc::new(NestedField::optional(
237                        12,
238                        "v_ts_ns_ntz",
239                        Type::Primitive(PrimitiveType::TimestampNs),
240                    )),
241                ])
242                .build()
243                .unwrap(),
244        );
245        let metadata = ManifestMetadata {
246            schema_id: 0,
247            schema: schema.clone(),
248            partition_spec: PartitionSpec::builder(schema)
249                .with_spec_id(0)
250                .build()
251                .unwrap(),
252            content: ManifestContentType::Data,
253            format_version: FormatVersion::V2,
254        };
255        let mut entries = vec![
256                ManifestEntry {
257                    status: ManifestStatus::Added,
258                    snapshot_id: None,
259                    sequence_number: None,
260                    file_sequence_number: None,
261                    data_file: DataFile {content:DataContentType::Data,file_path:"s3a://icebergdata/demo/s1/t1/data/00000-0-ba56fbfa-f2ff-40c9-bb27-565ad6dc2be8-00000.parquet".to_string(),file_format:DataFileFormat::Parquet,partition:Struct::empty(),record_count:1,file_size_in_bytes:5442,column_sizes:HashMap::from([(0,73),(6,34),(2,73),(7,61),(3,61),(5,62),(9,79),(10,73),(1,61),(4,73),(8,73)]),value_counts:HashMap::from([(4,1),(5,1),(2,1),(0,1),(3,1),(6,1),(8,1),(1,1),(10,1),(7,1),(9,1)]),null_value_counts:HashMap::from([(1,0),(6,0),(2,0),(8,0),(0,0),(3,0),(5,0),(9,0),(7,0),(4,0),(10,0)]),nan_value_counts:HashMap::new(),lower_bounds:HashMap::new(),upper_bounds:HashMap::new(),key_metadata:None,split_offsets:Some(vec![4]),equality_ids:Some(Vec::new()),sort_order_id:None, partition_spec_id: 0,first_row_id: None,referenced_data_file: None,content_offset: None,content_size_in_bytes: None }
262                }
263            ];
264
265        // write manifest to file
266        let tmp_dir = TempDir::new().unwrap();
267        let path = tmp_dir.path().join("test_manifest.avro");
268        let io = FileIO::new_with_fs();
269        let output_file = io.new_output(path.to_str().unwrap()).unwrap();
270        let mut writer = ManifestWriterBuilder::new(
271            output_file,
272            Some(1),
273            None,
274            metadata.schema.clone(),
275            metadata.partition_spec.clone(),
276        )
277        .build_v2_data();
278        for entry in &entries {
279            writer.add_entry(entry.clone()).unwrap();
280        }
281        writer.write_manifest_file().await.unwrap();
282
283        // read back the manifest file and check the content
284        let actual_manifest =
285            Manifest::parse_avro(fs::read(path).expect("read_file must succeed").as_slice())
286                .unwrap();
287        // The snapshot id is assigned when the entry is added to the manifest.
288        entries[0].snapshot_id = Some(1);
289        assert_eq!(actual_manifest, Manifest::new(metadata, entries));
290    }
291
292    #[test]
293    fn test_parse_snappy_manifest_v2() {
294        let schema = Arc::new(
295            Schema::builder()
296                .with_fields(vec![Arc::new(NestedField::optional(
297                    1,
298                    "id",
299                    Type::Primitive(PrimitiveType::Long),
300                ))])
301                .build()
302                .unwrap(),
303        );
304        let partition_spec = PartitionSpec::builder(schema.clone())
305            .with_spec_id(0)
306            .build()
307            .unwrap();
308
309        for (manifest_content, file_content, file_path) in [
310            (
311                ManifestContentType::Data,
312                DataContentType::Data,
313                "s3://bucket/table/data/data.parquet",
314            ),
315            (
316                ManifestContentType::Deletes,
317                DataContentType::PositionDeletes,
318                "s3://bucket/table/data/delete.parquet",
319            ),
320        ] {
321            let metadata = ManifestMetadata {
322                schema_id: 0,
323                schema: schema.clone(),
324                partition_spec: partition_spec.clone(),
325                content: manifest_content,
326                format_version: FormatVersion::V2,
327            };
328            let entry = ManifestEntry {
329                status: ManifestStatus::Added,
330                snapshot_id: Some(1),
331                sequence_number: None,
332                file_sequence_number: None,
333                data_file: DataFile {
334                    content: file_content,
335                    file_path: file_path.to_string(),
336                    file_format: DataFileFormat::Parquet,
337                    partition: Struct::empty(),
338                    record_count: 1,
339                    file_size_in_bytes: 1024,
340                    column_sizes: HashMap::new(),
341                    value_counts: HashMap::new(),
342                    null_value_counts: HashMap::new(),
343                    nan_value_counts: HashMap::new(),
344                    lower_bounds: HashMap::new(),
345                    upper_bounds: HashMap::new(),
346                    key_metadata: None,
347                    split_offsets: None,
348                    equality_ids: None,
349                    sort_order_id: None,
350                    partition_spec_id: 0,
351                    first_row_id: None,
352                    referenced_data_file: None,
353                    content_offset: None,
354                    content_size_in_bytes: None,
355                },
356            };
357
358            let partition_type = metadata
359                .partition_spec
360                .partition_type(&metadata.schema)
361                .unwrap();
362            let avro_schema = manifest_schema_v2(&partition_type).unwrap();
363            let mut writer = Writer::with_codec(&avro_schema, Vec::new(), Codec::Snappy);
364            writer
365                .add_user_metadata("schema".to_string(), to_vec(&metadata.schema).unwrap())
366                .unwrap();
367            writer
368                .add_user_metadata(
369                    "schema-id".to_string(),
370                    metadata.schema.schema_id().to_string(),
371                )
372                .unwrap();
373            writer
374                .add_user_metadata(
375                    "partition-spec".to_string(),
376                    to_vec(&metadata.partition_spec.fields()).unwrap(),
377                )
378                .unwrap();
379            writer
380                .add_user_metadata(
381                    "partition-spec-id".to_string(),
382                    metadata.partition_spec.spec_id().to_string(),
383                )
384                .unwrap();
385            writer
386                .add_user_metadata(
387                    "format-version".to_string(),
388                    (metadata.format_version as u8).to_string(),
389                )
390                .unwrap();
391            writer
392                .add_user_metadata("content".to_string(), metadata.content.to_string())
393                .unwrap();
394            let value = to_value(
395                _serde::ManifestEntryV2::try_from(entry.clone(), &partition_type).unwrap(),
396            )
397            .unwrap()
398            .resolve(&avro_schema)
399            .unwrap();
400            writer.append(value).unwrap();
401            let bs = writer.into_inner().unwrap();
402
403            let parsed_manifest = Manifest::parse_avro(&bs).unwrap();
404
405            assert_eq!(parsed_manifest, Manifest::new(metadata, vec![entry]));
406        }
407    }
408
409    #[tokio::test]
410    async fn test_parse_manifest_v2_partition() {
411        let schema = Arc::new(
412            Schema::builder()
413                .with_fields(vec![
414                    Arc::new(NestedField::optional(
415                        1,
416                        "id",
417                        Type::Primitive(PrimitiveType::Long),
418                    )),
419                    Arc::new(NestedField::optional(
420                        2,
421                        "v_int",
422                        Type::Primitive(PrimitiveType::Int),
423                    )),
424                    Arc::new(NestedField::optional(
425                        3,
426                        "v_long",
427                        Type::Primitive(PrimitiveType::Long),
428                    )),
429                    Arc::new(NestedField::optional(
430                        4,
431                        "v_float",
432                        Type::Primitive(PrimitiveType::Float),
433                    )),
434                    Arc::new(NestedField::optional(
435                        5,
436                        "v_double",
437                        Type::Primitive(PrimitiveType::Double),
438                    )),
439                    Arc::new(NestedField::optional(
440                        6,
441                        "v_varchar",
442                        Type::Primitive(PrimitiveType::String),
443                    )),
444                    Arc::new(NestedField::optional(
445                        7,
446                        "v_bool",
447                        Type::Primitive(PrimitiveType::Boolean),
448                    )),
449                    Arc::new(NestedField::optional(
450                        8,
451                        "v_date",
452                        Type::Primitive(PrimitiveType::Date),
453                    )),
454                    Arc::new(NestedField::optional(
455                        9,
456                        "v_timestamp",
457                        Type::Primitive(PrimitiveType::Timestamptz),
458                    )),
459                    Arc::new(NestedField::optional(
460                        10,
461                        "v_decimal",
462                        Type::Primitive(PrimitiveType::Decimal {
463                            precision: 36,
464                            scale: 10,
465                        }),
466                    )),
467                    Arc::new(NestedField::optional(
468                        11,
469                        "v_ts_ntz",
470                        Type::Primitive(PrimitiveType::Timestamp),
471                    )),
472                    Arc::new(NestedField::optional(
473                        12,
474                        "v_ts_ns_ntz",
475                        Type::Primitive(PrimitiveType::TimestampNs),
476                    )),
477                ])
478                .build()
479                .unwrap(),
480        );
481        let metadata = ManifestMetadata {
482            schema_id: 0,
483            schema: schema.clone(),
484            partition_spec: PartitionSpec::builder(schema)
485                .with_spec_id(0)
486                .add_partition_field("v_int", "v_int", Transform::Identity)
487                .unwrap()
488                .add_partition_field("v_long", "v_long", Transform::Identity)
489                .unwrap()
490                .build()
491                .unwrap(),
492            content: ManifestContentType::Data,
493            format_version: FormatVersion::V2,
494        };
495        let mut entries = vec![ManifestEntry {
496                status: ManifestStatus::Added,
497                snapshot_id: None,
498                sequence_number: None,
499                file_sequence_number: None,
500                data_file: DataFile {
501                    content: DataContentType::Data,
502                    file_format: DataFileFormat::Parquet,
503                    file_path: "s3a://icebergdata/demo/s1/t1/data/00000-0-378b56f5-5c52-4102-a2c2-f05f8a7cbe4a-00000.parquet".to_string(),
504                    partition: Struct::from_iter(
505                        vec![
506                            Some(Literal::int(1)),
507                            Some(Literal::long(1000)),
508                        ]
509                            .into_iter()
510                    ),
511                    record_count: 1,
512                    file_size_in_bytes: 5442,
513                    column_sizes: HashMap::from([
514                        (0, 73),
515                        (6, 34),
516                        (2, 73),
517                        (7, 61),
518                        (3, 61),
519                        (5, 62),
520                        (9, 79),
521                        (10, 73),
522                        (1, 61),
523                        (4, 73),
524                        (8, 73)
525                    ]),
526                    value_counts: HashMap::from([
527                        (4, 1),
528                        (5, 1),
529                        (2, 1),
530                        (0, 1),
531                        (3, 1),
532                        (6, 1),
533                        (8, 1),
534                        (1, 1),
535                        (10, 1),
536                        (7, 1),
537                        (9, 1)
538                    ]),
539                    null_value_counts: HashMap::from([
540                        (1, 0),
541                        (6, 0),
542                        (2, 0),
543                        (8, 0),
544                        (0, 0),
545                        (3, 0),
546                        (5, 0),
547                        (9, 0),
548                        (7, 0),
549                        (4, 0),
550                        (10, 0)
551                    ]),
552                    nan_value_counts: HashMap::new(),
553                    lower_bounds: HashMap::new(),
554                    upper_bounds: HashMap::new(),
555                    key_metadata: None,
556                    split_offsets: Some(vec![4]),
557                    equality_ids: Some(Vec::new()),
558                    sort_order_id: None,
559                    partition_spec_id: 0,
560                    first_row_id: None,
561                    referenced_data_file: None,
562                    content_offset: None,
563                    content_size_in_bytes: None,
564                },
565            }];
566
567        // write manifest to file and check the return manifest file.
568        let tmp_dir = TempDir::new().unwrap();
569        let path = tmp_dir.path().join("test_manifest.avro");
570        let io = FileIO::new_with_fs();
571        let output_file = io.new_output(path.to_str().unwrap()).unwrap();
572        let mut writer = ManifestWriterBuilder::new(
573            output_file,
574            Some(2),
575            None,
576            metadata.schema.clone(),
577            metadata.partition_spec.clone(),
578        )
579        .build_v2_data();
580        for entry in &entries {
581            writer.add_entry(entry.clone()).unwrap();
582        }
583        let manifest_file = writer.write_manifest_file().await.unwrap();
584        assert_eq!(manifest_file.sequence_number, UNASSIGNED_SEQUENCE_NUMBER);
585        assert_eq!(
586            manifest_file.min_sequence_number,
587            UNASSIGNED_SEQUENCE_NUMBER
588        );
589
590        // read back the manifest file and check the content
591        let actual_manifest =
592            Manifest::parse_avro(fs::read(path).expect("read_file must succeed").as_slice())
593                .unwrap();
594        // The snapshot id is assigned when the entry is added to the manifest.
595        entries[0].snapshot_id = Some(2);
596        assert_eq!(actual_manifest, Manifest::new(metadata, entries));
597    }
598
599    #[tokio::test]
600    async fn test_parse_manifest_v1_unpartition() {
601        let schema = Arc::new(
602            Schema::builder()
603                .with_schema_id(1)
604                .with_fields(vec![
605                    Arc::new(NestedField::optional(
606                        1,
607                        "id",
608                        Type::Primitive(PrimitiveType::Int),
609                    )),
610                    Arc::new(NestedField::optional(
611                        2,
612                        "data",
613                        Type::Primitive(PrimitiveType::String),
614                    )),
615                    Arc::new(NestedField::optional(
616                        3,
617                        "comment",
618                        Type::Primitive(PrimitiveType::String),
619                    )),
620                ])
621                .build()
622                .unwrap(),
623        );
624        let metadata = ManifestMetadata {
625            schema_id: 1,
626            schema: schema.clone(),
627            partition_spec: PartitionSpec::builder(schema)
628                .with_spec_id(0)
629                .build()
630                .unwrap(),
631            content: ManifestContentType::Data,
632            format_version: FormatVersion::V1,
633        };
634        let mut entries = vec![ManifestEntry {
635                status: ManifestStatus::Added,
636                snapshot_id: Some(0),
637                sequence_number: Some(0),
638                file_sequence_number: Some(0),
639                data_file: DataFile {
640                    content: DataContentType::Data,
641                    file_path: "s3://testbucket/iceberg_data/iceberg_ctl/iceberg_db/iceberg_tbl/data/00000-7-45268d71-54eb-476c-b42c-942d880c04a1-00001.parquet".to_string(),
642                    file_format: DataFileFormat::Parquet,
643                    partition: Struct::empty(),
644                    record_count: 1,
645                    file_size_in_bytes: 875,
646                    column_sizes: HashMap::from([(1,47),(2,48),(3,52)]),
647                    value_counts: HashMap::from([(1,1),(2,1),(3,1)]),
648                    null_value_counts: HashMap::from([(1,0),(2,0),(3,0)]),
649                    nan_value_counts: HashMap::new(),
650                    lower_bounds: HashMap::from([(1,Datum::int(1)),(2,Datum::string("a")),(3,Datum::string("AC/DC"))]),
651                    upper_bounds: HashMap::from([(1,Datum::int(1)),(2,Datum::string("a")),(3,Datum::string("AC/DC"))]),
652                    key_metadata: None,
653                    split_offsets: Some(vec![4]),
654                    equality_ids: None,
655                    sort_order_id: Some(0),
656                    partition_spec_id: 0,
657                    first_row_id: None,
658                    referenced_data_file: None,
659                    content_offset: None,
660                    content_size_in_bytes: None,
661                }
662            }];
663
664        // write manifest to file
665        let tmp_dir = TempDir::new().unwrap();
666        let path = tmp_dir.path().join("test_manifest.avro");
667        let io = FileIO::new_with_fs();
668        let output_file = io.new_output(path.to_str().unwrap()).unwrap();
669        let mut writer = ManifestWriterBuilder::new(
670            output_file,
671            Some(3),
672            None,
673            metadata.schema.clone(),
674            metadata.partition_spec.clone(),
675        )
676        .build_v1();
677        for entry in &entries {
678            writer.add_entry(entry.clone()).unwrap();
679        }
680        writer.write_manifest_file().await.unwrap();
681
682        // read back the manifest file and check the content
683        let actual_manifest =
684            Manifest::parse_avro(fs::read(path).expect("read_file must succeed").as_slice())
685                .unwrap();
686        // The snapshot id is assigned when the entry is added to the manifest.
687        entries[0].snapshot_id = Some(3);
688        assert_eq!(actual_manifest, Manifest::new(metadata, entries));
689    }
690
691    #[tokio::test]
692    async fn test_parse_manifest_v1_partition() {
693        let schema = Arc::new(
694            Schema::builder()
695                .with_fields(vec![
696                    Arc::new(NestedField::optional(
697                        1,
698                        "id",
699                        Type::Primitive(PrimitiveType::Long),
700                    )),
701                    Arc::new(NestedField::optional(
702                        2,
703                        "data",
704                        Type::Primitive(PrimitiveType::String),
705                    )),
706                    Arc::new(NestedField::optional(
707                        3,
708                        "category",
709                        Type::Primitive(PrimitiveType::String),
710                    )),
711                ])
712                .build()
713                .unwrap(),
714        );
715        let metadata = ManifestMetadata {
716            schema_id: 0,
717            schema: schema.clone(),
718            partition_spec: PartitionSpec::builder(schema)
719                .add_partition_field("category", "category", Transform::Identity)
720                .unwrap()
721                .build()
722                .unwrap(),
723            content: ManifestContentType::Data,
724            format_version: FormatVersion::V1,
725        };
726        let mut entries = vec![
727                ManifestEntry {
728                    status: ManifestStatus::Added,
729                    snapshot_id: Some(0),
730                    sequence_number: Some(0),
731                    file_sequence_number: Some(0),
732                    data_file: DataFile {
733                        content: DataContentType::Data,
734                        file_path: "s3://testbucket/prod/db/sample/data/category=x/00010-1-d5c93668-1e52-41ac-92a6-bba590cbf249-00001.parquet".to_string(),
735                        file_format: DataFileFormat::Parquet,
736                        partition: Struct::from_iter(
737                            vec![
738                                Some(
739                                    Literal::string("x"),
740                                ),
741                            ]
742                                .into_iter()
743                        ),
744                        record_count: 1,
745                        file_size_in_bytes: 874,
746                        column_sizes: HashMap::from([(1, 46), (2, 48), (3, 48)]),
747                        value_counts: HashMap::from([(1, 1), (2, 1), (3, 1)]),
748                        null_value_counts: HashMap::from([(1, 0), (2, 0), (3, 0)]),
749                        nan_value_counts: HashMap::new(),
750                        lower_bounds: HashMap::from([
751                        (1, Datum::long(1)),
752                        (2, Datum::string("a")),
753                        (3, Datum::string("x"))
754                        ]),
755                        upper_bounds: HashMap::from([
756                        (1, Datum::long(1)),
757                        (2, Datum::string("a")),
758                        (3, Datum::string("x"))
759                        ]),
760                        key_metadata: None,
761                        split_offsets: Some(vec![4]),
762                        equality_ids: None,
763                        sort_order_id: Some(0),
764                        partition_spec_id: 0,
765                        first_row_id: None,
766                        referenced_data_file: None,
767                        content_offset: None,
768                        content_size_in_bytes: None,
769                    },
770                }
771            ];
772
773        // write manifest to file
774        let tmp_dir = TempDir::new().unwrap();
775        let path = tmp_dir.path().join("test_manifest.avro");
776        let io = FileIO::new_with_fs();
777        let output_file = io.new_output(path.to_str().unwrap()).unwrap();
778        let mut writer = ManifestWriterBuilder::new(
779            output_file,
780            Some(2),
781            None,
782            metadata.schema.clone(),
783            metadata.partition_spec.clone(),
784        )
785        .build_v1();
786        for entry in &entries {
787            writer.add_entry(entry.clone()).unwrap();
788        }
789        let manifest_file = writer.write_manifest_file().await.unwrap();
790        let partitions = manifest_file.partitions.unwrap();
791        assert_eq!(partitions.len(), 1);
792        assert_eq!(
793            partitions[0].clone().lower_bound.unwrap(),
794            Datum::string("x").to_bytes().unwrap()
795        );
796        assert_eq!(
797            partitions[0].clone().upper_bound.unwrap(),
798            Datum::string("x").to_bytes().unwrap()
799        );
800
801        // read back the manifest file and check the content
802        let actual_manifest =
803            Manifest::parse_avro(fs::read(path).expect("read_file must succeed").as_slice())
804                .unwrap();
805        // The snapshot id is assigned when the entry is added to the manifest.
806        entries[0].snapshot_id = Some(2);
807        assert_eq!(actual_manifest, Manifest::new(metadata, entries));
808    }
809
810    #[tokio::test]
811    async fn test_parse_manifest_with_schema_evolution() {
812        let schema = Arc::new(
813            Schema::builder()
814                .with_fields(vec![
815                    Arc::new(NestedField::optional(
816                        1,
817                        "id",
818                        Type::Primitive(PrimitiveType::Long),
819                    )),
820                    Arc::new(NestedField::optional(
821                        2,
822                        "v_int",
823                        Type::Primitive(PrimitiveType::Int),
824                    )),
825                ])
826                .build()
827                .unwrap(),
828        );
829        let metadata = ManifestMetadata {
830            schema_id: 0,
831            schema: schema.clone(),
832            partition_spec: PartitionSpec::builder(schema)
833                .with_spec_id(0)
834                .build()
835                .unwrap(),
836            content: ManifestContentType::Data,
837            format_version: FormatVersion::V2,
838        };
839        let entries = vec![ManifestEntry {
840                status: ManifestStatus::Added,
841                snapshot_id: None,
842                sequence_number: None,
843                file_sequence_number: None,
844                data_file: DataFile {
845                    content: DataContentType::Data,
846                    file_format: DataFileFormat::Parquet,
847                    file_path: "s3a://icebergdata/demo/s1/t1/data/00000-0-378b56f5-5c52-4102-a2c2-f05f8a7cbe4a-00000.parquet".to_string(),
848                    partition: Struct::empty(),
849                    record_count: 1,
850                    file_size_in_bytes: 5442,
851                    column_sizes: HashMap::from([
852                        (1, 61),
853                        (2, 73),
854                        (3, 61),
855                    ]),
856                    value_counts: HashMap::default(),
857                    null_value_counts: HashMap::default(),
858                    nan_value_counts: HashMap::new(),
859                    lower_bounds: HashMap::from([
860                        (1, Datum::long(1)),
861                        (2, Datum::int(2)),
862                        (3, Datum::string("x"))
863                    ]),
864                    upper_bounds: HashMap::from([
865                        (1, Datum::long(1)),
866                        (2, Datum::int(2)),
867                        (3, Datum::string("x"))
868                    ]),
869                    key_metadata: None,
870                    split_offsets: Some(vec![4]),
871                    equality_ids: None,
872                    sort_order_id: None,
873                    partition_spec_id: 0,
874                    first_row_id: None,
875                    referenced_data_file: None,
876                    content_offset: None,
877                    content_size_in_bytes: None,
878                },
879            }];
880
881        // write manifest to file
882        let tmp_dir = TempDir::new().unwrap();
883        let path = tmp_dir.path().join("test_manifest.avro");
884        let io = FileIO::new_with_fs();
885        let output_file = io.new_output(path.to_str().unwrap()).unwrap();
886        let mut writer = ManifestWriterBuilder::new(
887            output_file,
888            Some(2),
889            None,
890            metadata.schema.clone(),
891            metadata.partition_spec.clone(),
892        )
893        .build_v2_data();
894        for entry in &entries {
895            writer.add_entry(entry.clone()).unwrap();
896        }
897        writer.write_manifest_file().await.unwrap();
898
899        // read back the manifest file and check the content
900        let actual_manifest =
901            Manifest::parse_avro(fs::read(path).expect("read_file must succeed").as_slice())
902                .unwrap();
903
904        // Compared with original manifest, the lower_bounds and upper_bounds no longer has data for field 3, and
905        // other parts should be same.
906        // The snapshot id is assigned when the entry is added to the manifest.
907        let schema = Arc::new(
908            Schema::builder()
909                .with_fields(vec![
910                    Arc::new(NestedField::optional(
911                        1,
912                        "id",
913                        Type::Primitive(PrimitiveType::Long),
914                    )),
915                    Arc::new(NestedField::optional(
916                        2,
917                        "v_int",
918                        Type::Primitive(PrimitiveType::Int),
919                    )),
920                ])
921                .build()
922                .unwrap(),
923        );
924        let expected_manifest = Manifest {
925            metadata: ManifestMetadata {
926                schema_id: 0,
927                schema: schema.clone(),
928                partition_spec: PartitionSpec::builder(schema).with_spec_id(0).build().unwrap(),
929                content: ManifestContentType::Data,
930                format_version: FormatVersion::V2,
931            },
932            entries: vec![Arc::new(ManifestEntry {
933                status: ManifestStatus::Added,
934                snapshot_id: Some(2),
935                sequence_number: None,
936                file_sequence_number: None,
937                data_file: DataFile {
938                    content: DataContentType::Data,
939                    file_format: DataFileFormat::Parquet,
940                    file_path: "s3a://icebergdata/demo/s1/t1/data/00000-0-378b56f5-5c52-4102-a2c2-f05f8a7cbe4a-00000.parquet".to_string(),
941                    partition: Struct::empty(),
942                    record_count: 1,
943                    file_size_in_bytes: 5442,
944                    column_sizes: HashMap::from([
945                        (1, 61),
946                        (2, 73),
947                        (3, 61),
948                    ]),
949                    value_counts: HashMap::default(),
950                    null_value_counts: HashMap::default(),
951                    nan_value_counts: HashMap::new(),
952                    lower_bounds: HashMap::from([
953                        (1, Datum::long(1)),
954                        (2, Datum::int(2)),
955                    ]),
956                    upper_bounds: HashMap::from([
957                        (1, Datum::long(1)),
958                        (2, Datum::int(2)),
959                    ]),
960                    key_metadata: None,
961                    split_offsets: Some(vec![4]),
962                    equality_ids: None,
963                    sort_order_id: None,
964                    partition_spec_id: 0,
965                    first_row_id: None,
966                    referenced_data_file: None,
967                    content_offset: None,
968                    content_size_in_bytes: None,
969                },
970            })],
971        };
972
973        assert_eq!(actual_manifest, expected_manifest);
974    }
975
976    #[tokio::test]
977    async fn test_manifest_summary() {
978        let schema = Arc::new(
979            Schema::builder()
980                .with_fields(vec![
981                    Arc::new(NestedField::optional(
982                        1,
983                        "time",
984                        Type::Primitive(PrimitiveType::Date),
985                    )),
986                    Arc::new(NestedField::optional(
987                        2,
988                        "v_float",
989                        Type::Primitive(PrimitiveType::Float),
990                    )),
991                    Arc::new(NestedField::optional(
992                        3,
993                        "v_double",
994                        Type::Primitive(PrimitiveType::Double),
995                    )),
996                ])
997                .build()
998                .unwrap(),
999        );
1000        let partition_spec = PartitionSpec::builder(schema.clone())
1001            .with_spec_id(0)
1002            .add_partition_field("time", "year_of_time", Transform::Year)
1003            .unwrap()
1004            .add_partition_field("v_float", "f", Transform::Identity)
1005            .unwrap()
1006            .add_partition_field("v_double", "d", Transform::Identity)
1007            .unwrap()
1008            .build()
1009            .unwrap();
1010        let metadata = ManifestMetadata {
1011            schema_id: 0,
1012            schema,
1013            partition_spec,
1014            content: ManifestContentType::Data,
1015            format_version: FormatVersion::V2,
1016        };
1017        let entries = vec![
1018            ManifestEntry {
1019                status: ManifestStatus::Added,
1020                snapshot_id: None,
1021                sequence_number: None,
1022                file_sequence_number: None,
1023                data_file: DataFile {
1024                    content: DataContentType::Data,
1025                    file_path: "s3a://icebergdata/demo/s1/t1/data/00000-0-ba56fbfa-f2ff-40c9-bb27-565ad6dc2be8-00000.parquet".to_string(),
1026                    file_format: DataFileFormat::Parquet,
1027                    partition: Struct::from_iter(
1028                        vec![
1029                            Some(Literal::int(2021)),
1030                            Some(Literal::float(1.0)),
1031                            Some(Literal::double(2.0)),
1032                        ]
1033                    ),
1034                    record_count: 1,
1035                    file_size_in_bytes: 5442,
1036                    column_sizes: HashMap::from([(0,73),(6,34),(2,73),(7,61),(3,61),(5,62),(9,79),(10,73),(1,61),(4,73),(8,73)]),
1037                    value_counts: HashMap::from([(4,1),(5,1),(2,1),(0,1),(3,1),(6,1),(8,1),(1,1),(10,1),(7,1),(9,1)]),
1038                    null_value_counts: HashMap::from([(1,0),(6,0),(2,0),(8,0),(0,0),(3,0),(5,0),(9,0),(7,0),(4,0),(10,0)]),
1039                    nan_value_counts: HashMap::new(),
1040                    lower_bounds: HashMap::new(),
1041                    upper_bounds: HashMap::new(),
1042                    key_metadata: None,
1043                    split_offsets: Some(vec![4]),
1044                    equality_ids: None,
1045                    sort_order_id: None,
1046                    partition_spec_id: 0,
1047                    first_row_id: None,
1048                    referenced_data_file: None,
1049                    content_offset: None,
1050                    content_size_in_bytes: None,
1051                }
1052            },
1053                ManifestEntry {
1054                    status: ManifestStatus::Added,
1055                    snapshot_id: None,
1056                    sequence_number: None,
1057                    file_sequence_number: None,
1058                    data_file: DataFile {
1059                        content: DataContentType::Data,
1060                        file_path: "s3a://icebergdata/demo/s1/t1/data/00000-0-ba56fbfa-f2ff-40c9-bb27-565ad6dc2be8-00000.parquet".to_string(),
1061                        file_format: DataFileFormat::Parquet,
1062                        partition: Struct::from_iter(
1063                            vec![
1064                                Some(Literal::int(1111)),
1065                                Some(Literal::float(15.5)),
1066                                Some(Literal::double(25.5)),
1067                            ]
1068                        ),
1069                        record_count: 1,
1070                        file_size_in_bytes: 5442,
1071                        column_sizes: HashMap::from([(0,73),(6,34),(2,73),(7,61),(3,61),(5,62),(9,79),(10,73),(1,61),(4,73),(8,73)]),
1072                        value_counts: HashMap::from([(4,1),(5,1),(2,1),(0,1),(3,1),(6,1),(8,1),(1,1),(10,1),(7,1),(9,1)]),
1073                        null_value_counts: HashMap::from([(1,0),(6,0),(2,0),(8,0),(0,0),(3,0),(5,0),(9,0),(7,0),(4,0),(10,0)]),
1074                        nan_value_counts: HashMap::new(),
1075                        lower_bounds: HashMap::new(),
1076                        upper_bounds: HashMap::new(),
1077                        key_metadata: None,
1078                        split_offsets: Some(vec![4]),
1079                        equality_ids: None,
1080                        sort_order_id: None,
1081                        partition_spec_id: 0,
1082                        first_row_id: None,
1083                        referenced_data_file: None,
1084                        content_offset: None,
1085                        content_size_in_bytes: None,
1086                    }
1087                },
1088                ManifestEntry {
1089                    status: ManifestStatus::Added,
1090                    snapshot_id: None,
1091                    sequence_number: None,
1092                    file_sequence_number: None,
1093                    data_file: DataFile {
1094                        content: DataContentType::Data,
1095                        file_path: "s3a://icebergdata/demo/s1/t1/data/00000-0-ba56fbfa-f2ff-40c9-bb27-565ad6dc2be8-00000.parquet".to_string(),
1096                        file_format: DataFileFormat::Parquet,
1097                        partition: Struct::from_iter(
1098                            vec![
1099                                Some(Literal::int(1211)),
1100                                Some(Literal::float(f32::NAN)),
1101                                Some(Literal::double(1.0)),
1102                            ]
1103                        ),
1104                        record_count: 1,
1105                        file_size_in_bytes: 5442,
1106                        column_sizes: HashMap::from([(0,73),(6,34),(2,73),(7,61),(3,61),(5,62),(9,79),(10,73),(1,61),(4,73),(8,73)]),
1107                        value_counts: HashMap::from([(4,1),(5,1),(2,1),(0,1),(3,1),(6,1),(8,1),(1,1),(10,1),(7,1),(9,1)]),
1108                        null_value_counts: HashMap::from([(1,0),(6,0),(2,0),(8,0),(0,0),(3,0),(5,0),(9,0),(7,0),(4,0),(10,0)]),
1109                        nan_value_counts: HashMap::new(),
1110                        lower_bounds: HashMap::new(),
1111                        upper_bounds: HashMap::new(),
1112                        key_metadata: None,
1113                        split_offsets: Some(vec![4]),
1114                        equality_ids: None,
1115                        sort_order_id: None,
1116                        partition_spec_id: 0,
1117                        first_row_id: None,
1118                        referenced_data_file: None,
1119                        content_offset: None,
1120                        content_size_in_bytes: None,
1121                    }
1122                },
1123                ManifestEntry {
1124                    status: ManifestStatus::Added,
1125                    snapshot_id: None,
1126                    sequence_number: None,
1127                    file_sequence_number: None,
1128                    data_file: DataFile {
1129                        content: DataContentType::Data,
1130                        file_path: "s3a://icebergdata/demo/s1/t1/data/00000-0-ba56fbfa-f2ff-40c9-bb27-565ad6dc2be8-00000.parquet".to_string(),
1131                        file_format: DataFileFormat::Parquet,
1132                        partition: Struct::from_iter(
1133                            vec![
1134                                Some(Literal::int(1111)),
1135                                None,
1136                                Some(Literal::double(11.0)),
1137                            ]
1138                        ),
1139                        record_count: 1,
1140                        file_size_in_bytes: 5442,
1141                        column_sizes: HashMap::from([(0,73),(6,34),(2,73),(7,61),(3,61),(5,62),(9,79),(10,73),(1,61),(4,73),(8,73)]),
1142                        value_counts: HashMap::from([(4,1),(5,1),(2,1),(0,1),(3,1),(6,1),(8,1),(1,1),(10,1),(7,1),(9,1)]),
1143                        null_value_counts: HashMap::from([(1,0),(6,0),(2,0),(8,0),(0,0),(3,0),(5,0),(9,0),(7,0),(4,0),(10,0)]),
1144                        nan_value_counts: HashMap::new(),
1145                        lower_bounds: HashMap::new(),
1146                        upper_bounds: HashMap::new(),
1147                        key_metadata: None,
1148                        split_offsets: Some(vec![4]),
1149                        equality_ids: None,
1150                        sort_order_id: None,
1151                        partition_spec_id: 0,
1152                        first_row_id: None,
1153                        referenced_data_file: None,
1154                        content_offset: None,
1155                        content_size_in_bytes: None,
1156                    }
1157                },
1158        ];
1159
1160        // write manifest to file
1161        let tmp_dir = TempDir::new().unwrap();
1162        let path = tmp_dir.path().join("test_manifest.avro");
1163        let io = FileIO::new_with_fs();
1164        let output_file = io.new_output(path.to_str().unwrap()).unwrap();
1165        let mut writer = ManifestWriterBuilder::new(
1166            output_file,
1167            Some(1),
1168            None,
1169            metadata.schema.clone(),
1170            metadata.partition_spec.clone(),
1171        )
1172        .build_v2_data();
1173        for entry in &entries {
1174            writer.add_entry(entry.clone()).unwrap();
1175        }
1176        let res = writer.write_manifest_file().await.unwrap();
1177
1178        let partitions = res.partitions.unwrap();
1179
1180        assert_eq!(partitions.len(), 3);
1181        assert_eq!(
1182            partitions[0].clone().lower_bound.unwrap(),
1183            Datum::int(1111).to_bytes().unwrap()
1184        );
1185        assert_eq!(
1186            partitions[0].clone().upper_bound.unwrap(),
1187            Datum::int(2021).to_bytes().unwrap()
1188        );
1189        assert!(!partitions[0].clone().contains_null);
1190        assert_eq!(partitions[0].clone().contains_nan, Some(false));
1191
1192        assert_eq!(
1193            partitions[1].clone().lower_bound.unwrap(),
1194            Datum::float(1.0).to_bytes().unwrap()
1195        );
1196        assert_eq!(
1197            partitions[1].clone().upper_bound.unwrap(),
1198            Datum::float(15.5).to_bytes().unwrap()
1199        );
1200        assert!(partitions[1].clone().contains_null);
1201        assert_eq!(partitions[1].clone().contains_nan, Some(true));
1202
1203        assert_eq!(
1204            partitions[2].clone().lower_bound.unwrap(),
1205            Datum::double(1.0).to_bytes().unwrap()
1206        );
1207        assert_eq!(
1208            partitions[2].clone().upper_bound.unwrap(),
1209            Datum::double(25.5).to_bytes().unwrap()
1210        );
1211        assert!(!partitions[2].clone().contains_null);
1212        assert_eq!(partitions[2].clone().contains_nan, Some(false));
1213    }
1214
1215    #[test]
1216    fn test_data_file_serialization() {
1217        // Create a simple schema
1218        let schema = Schema::builder()
1219            .with_schema_id(1)
1220            .with_identifier_field_ids(vec![1])
1221            .with_fields(vec![
1222                NestedField::required(1, "id", Type::Primitive(PrimitiveType::Long)).into(),
1223                NestedField::required(2, "name", Type::Primitive(PrimitiveType::String)).into(),
1224            ])
1225            .build()
1226            .unwrap();
1227
1228        // Create a partition spec
1229        let partition_spec = PartitionSpec::builder(schema.clone())
1230            .with_spec_id(1)
1231            .add_partition_field("id", "id_partition", Transform::Identity)
1232            .unwrap()
1233            .build()
1234            .unwrap();
1235
1236        // Get partition type from the partition spec
1237        let partition_type = partition_spec.partition_type(&schema).unwrap();
1238
1239        // Create a vector of DataFile objects
1240        let data_files = vec![
1241            DataFileBuilder::default()
1242                .content(DataContentType::Data)
1243                .file_format(DataFileFormat::Parquet)
1244                .file_path("path/to/file1.parquet".to_string())
1245                .file_size_in_bytes(1024)
1246                .record_count(100)
1247                .partition_spec_id(1)
1248                .partition(Struct::empty())
1249                .column_sizes(HashMap::from([(1, 512), (2, 1024)]))
1250                .value_counts(HashMap::from([(1, 100), (2, 500)]))
1251                .null_value_counts(HashMap::from([(1, 0), (2, 1)]))
1252                .build()
1253                .unwrap(),
1254            DataFileBuilder::default()
1255                .content(DataContentType::Data)
1256                .file_format(DataFileFormat::Parquet)
1257                .file_path("path/to/file2.parquet".to_string())
1258                .file_size_in_bytes(2048)
1259                .record_count(200)
1260                .partition_spec_id(1)
1261                .partition(Struct::empty())
1262                .column_sizes(HashMap::from([(1, 1024), (2, 2048)]))
1263                .value_counts(HashMap::from([(1, 200), (2, 600)]))
1264                .null_value_counts(HashMap::from([(1, 10), (2, 999)]))
1265                .build()
1266                .unwrap(),
1267        ];
1268
1269        // Serialize the DataFile objects
1270        let serialized_files = data_files
1271            .clone()
1272            .into_iter()
1273            .map(|f| serialize_data_file_to_json(f, &partition_type, FormatVersion::V2).unwrap())
1274            .collect::<Vec<String>>();
1275
1276        // Verify we have the expected serialized files
1277        assert_eq!(serialized_files.len(), 2);
1278        let pretty_json1: Value = serde_json::from_str(serialized_files.first().unwrap()).unwrap();
1279        let pretty_json2: Value = serde_json::from_str(serialized_files.get(1).unwrap()).unwrap();
1280        let expected_serialized_file1 = serde_json::json!({
1281            "content": 0,
1282            "file_path": "path/to/file1.parquet",
1283            "file_format": "PARQUET",
1284            "partition": {},
1285            "record_count": 100,
1286            "file_size_in_bytes": 1024,
1287            "column_sizes": [
1288                { "key": 1, "value": 512 },
1289                { "key": 2, "value": 1024 }
1290            ],
1291            "value_counts": [
1292                { "key": 1, "value": 100 },
1293                { "key": 2, "value": 500 }
1294            ],
1295            "null_value_counts": [
1296                { "key": 1, "value": 0 },
1297                { "key": 2, "value": 1 }
1298            ],
1299            "nan_value_counts": [],
1300            "lower_bounds": [],
1301            "upper_bounds": [],
1302            "key_metadata": null,
1303            "split_offsets": null,
1304            "equality_ids": null,
1305            "sort_order_id": null,
1306            "first_row_id": null,
1307            "referenced_data_file": null,
1308            "content_offset": null,
1309            "content_size_in_bytes": null
1310        });
1311        let expected_serialized_file2 = serde_json::json!({
1312            "content": 0,
1313            "file_path": "path/to/file2.parquet",
1314            "file_format": "PARQUET",
1315            "partition": {},
1316            "record_count": 200,
1317            "file_size_in_bytes": 2048,
1318            "column_sizes": [
1319                { "key": 1, "value": 1024 },
1320                { "key": 2, "value": 2048 }
1321            ],
1322            "value_counts": [
1323                { "key": 1, "value": 200 },
1324                { "key": 2, "value": 600 }
1325            ],
1326            "null_value_counts": [
1327                { "key": 1, "value": 10 },
1328                { "key": 2, "value": 999 }
1329            ],
1330            "nan_value_counts": [],
1331            "lower_bounds": [],
1332            "upper_bounds": [],
1333            "key_metadata": null,
1334            "split_offsets": null,
1335            "equality_ids": null,
1336            "sort_order_id": null,
1337            "first_row_id": null,
1338            "referenced_data_file": null,
1339            "content_offset": null,
1340            "content_size_in_bytes": null
1341        });
1342        assert_eq!(pretty_json1, expected_serialized_file1);
1343        assert_eq!(pretty_json2, expected_serialized_file2);
1344
1345        // Now deserialize the JSON strings back into DataFile objects
1346        let deserialized_files: Vec<DataFile> = serialized_files
1347            .into_iter()
1348            .map(|json| {
1349                deserialize_data_file_from_json(
1350                    &json,
1351                    partition_spec.spec_id(),
1352                    &partition_type,
1353                    &schema,
1354                )
1355                .unwrap()
1356            })
1357            .collect();
1358
1359        // Verify we have the expected number of deserialized files
1360        assert_eq!(deserialized_files.len(), 2);
1361        let deserialized_data_file1 = deserialized_files.first().unwrap();
1362        let deserialized_data_file2 = deserialized_files.get(1).unwrap();
1363        let original_data_file1 = data_files.first().unwrap();
1364        let original_data_file2 = data_files.get(1).unwrap();
1365
1366        assert_eq!(deserialized_data_file1, original_data_file1);
1367        assert_eq!(deserialized_data_file2, original_data_file2);
1368    }
1369}