iceberg/spec/manifest/
_serde.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::collections::HashMap;
19
20use serde_derive::{Deserialize, Serialize};
21use serde_with::serde_as;
22
23use super::{Datum, ManifestEntry, Schema, Struct};
24use crate::spec::{FormatVersion, Literal, RawLiteral, StructType, Type};
25use crate::{Error, ErrorKind, metadata_columns};
26
27#[derive(Serialize, Deserialize)]
28pub(super) struct ManifestEntryV2 {
29    status: i32,
30    snapshot_id: Option<i64>,
31    sequence_number: Option<i64>,
32    file_sequence_number: Option<i64>,
33    data_file: DataFileSerde,
34}
35
36impl ManifestEntryV2 {
37    pub fn try_from(value: ManifestEntry, partition_type: &StructType) -> Result<Self, Error> {
38        Ok(Self {
39            status: value.status as i32,
40            snapshot_id: value.snapshot_id,
41            sequence_number: value.sequence_number,
42            file_sequence_number: value.file_sequence_number,
43            data_file: DataFileSerde::try_from(value.data_file, partition_type, FormatVersion::V2)?,
44        })
45    }
46
47    pub fn try_into(
48        self,
49        partition_spec_id: i32,
50        partition_type: &StructType,
51        schema: &Schema,
52    ) -> Result<ManifestEntry, Error> {
53        Ok(ManifestEntry {
54            status: self.status.try_into()?,
55            snapshot_id: self.snapshot_id,
56            sequence_number: self.sequence_number,
57            file_sequence_number: self.file_sequence_number,
58            data_file: self
59                .data_file
60                .try_into(partition_spec_id, partition_type, schema)?,
61        })
62    }
63}
64
65#[derive(Serialize, Deserialize)]
66pub(super) struct ManifestEntryV1 {
67    status: i32,
68    pub snapshot_id: i64,
69    data_file: DataFileSerde,
70}
71
72impl ManifestEntryV1 {
73    pub fn try_from(value: ManifestEntry, partition_type: &StructType) -> Result<Self, Error> {
74        Ok(Self {
75            status: value.status as i32,
76            snapshot_id: value.snapshot_id.unwrap_or_default(),
77            data_file: DataFileSerde::try_from(value.data_file, partition_type, FormatVersion::V1)?,
78        })
79    }
80
81    pub fn try_into(
82        self,
83        partition_spec_id: i32,
84        partition_type: &StructType,
85        schema: &Schema,
86    ) -> Result<ManifestEntry, Error> {
87        Ok(ManifestEntry {
88            status: self.status.try_into()?,
89            snapshot_id: Some(self.snapshot_id),
90            sequence_number: Some(0),
91            file_sequence_number: Some(0),
92            data_file: self
93                .data_file
94                .try_into(partition_spec_id, partition_type, schema)?,
95        })
96    }
97}
98
99#[serde_as]
100#[derive(Serialize, Deserialize)]
101pub(super) struct DataFileSerde {
102    #[serde(default)]
103    content: i32,
104    file_path: String,
105    file_format: String,
106    partition: RawLiteral,
107    record_count: i64,
108    file_size_in_bytes: i64,
109    #[serde(skip_deserializing, skip_serializing_if = "Option::is_none")]
110    block_size_in_bytes: Option<i64>,
111    column_sizes: Option<Vec<I64Entry>>,
112    value_counts: Option<Vec<I64Entry>>,
113    null_value_counts: Option<Vec<I64Entry>>,
114    nan_value_counts: Option<Vec<I64Entry>>,
115    lower_bounds: Option<Vec<BytesEntry>>,
116    upper_bounds: Option<Vec<BytesEntry>>,
117    key_metadata: Option<serde_bytes::ByteBuf>,
118    split_offsets: Option<Vec<i64>>,
119    equality_ids: Option<Vec<i32>>,
120    sort_order_id: Option<i32>,
121    first_row_id: Option<i64>,
122    referenced_data_file: Option<String>,
123    content_offset: Option<i64>,
124    content_size_in_bytes: Option<i64>,
125}
126
127impl DataFileSerde {
128    pub fn try_from(
129        value: super::DataFile,
130        partition_type: &StructType,
131        format_version: FormatVersion,
132    ) -> Result<Self, Error> {
133        let block_size_in_bytes = if format_version == FormatVersion::V1 {
134            Some(0)
135        } else {
136            None
137        };
138        Ok(Self {
139            content: value.content as i32,
140            file_path: value.file_path,
141            file_format: value.file_format.to_string().to_ascii_uppercase(),
142            partition: RawLiteral::try_from(
143                Literal::Struct(value.partition),
144                &Type::Struct(partition_type.clone()),
145            )?,
146            record_count: value.record_count.try_into()?,
147            file_size_in_bytes: value.file_size_in_bytes.try_into()?,
148            block_size_in_bytes,
149            column_sizes: Some(to_i64_entry(value.column_sizes)?),
150            value_counts: Some(to_i64_entry(value.value_counts)?),
151            null_value_counts: Some(to_i64_entry(value.null_value_counts)?),
152            nan_value_counts: Some(to_i64_entry(value.nan_value_counts)?),
153            lower_bounds: Some(to_bytes_entry(value.lower_bounds)?),
154            upper_bounds: Some(to_bytes_entry(value.upper_bounds)?),
155            key_metadata: value.key_metadata.map(serde_bytes::ByteBuf::from),
156            split_offsets: value.split_offsets,
157            equality_ids: value.equality_ids,
158            sort_order_id: value.sort_order_id,
159            first_row_id: value.first_row_id,
160            referenced_data_file: value.referenced_data_file,
161            content_offset: value.content_offset,
162            content_size_in_bytes: value.content_size_in_bytes,
163        })
164    }
165
166    pub fn try_into(
167        self,
168        partition_spec_id: i32,
169        partition_type: &StructType,
170        schema: &Schema,
171    ) -> Result<super::DataFile, Error> {
172        let partition = self
173            .partition
174            .try_into(&Type::Struct(partition_type.clone()))?
175            .map(|v| {
176                if let Literal::Struct(v) = v {
177                    Ok(v)
178                } else {
179                    Err(Error::new(
180                        ErrorKind::DataInvalid,
181                        "partition value is not a struct",
182                    ))
183                }
184            })
185            .transpose()?
186            .unwrap_or(Struct::empty());
187        Ok(super::DataFile {
188            content: self.content.try_into()?,
189            file_path: self.file_path,
190            file_format: self.file_format.parse()?,
191            partition,
192            record_count: self.record_count.try_into()?,
193            file_size_in_bytes: self.file_size_in_bytes.try_into()?,
194            column_sizes: self
195                .column_sizes
196                .map(parse_i64_entry)
197                .transpose()?
198                .unwrap_or_default(),
199            value_counts: self
200                .value_counts
201                .map(parse_i64_entry)
202                .transpose()?
203                .unwrap_or_default(),
204            null_value_counts: self
205                .null_value_counts
206                .map(parse_i64_entry)
207                .transpose()?
208                .unwrap_or_default(),
209            nan_value_counts: self
210                .nan_value_counts
211                .map(parse_i64_entry)
212                .transpose()?
213                .unwrap_or_default(),
214            lower_bounds: self
215                .lower_bounds
216                .map(|v| parse_bytes_entry(v, schema))
217                .transpose()?
218                .unwrap_or_default(),
219            upper_bounds: self
220                .upper_bounds
221                .map(|v| parse_bytes_entry(v, schema))
222                .transpose()?
223                .unwrap_or_default(),
224            key_metadata: self.key_metadata.map(|v| v.to_vec()),
225            split_offsets: self.split_offsets,
226            equality_ids: self.equality_ids,
227            sort_order_id: self.sort_order_id,
228            partition_spec_id,
229            first_row_id: self.first_row_id,
230            referenced_data_file: self.referenced_data_file,
231            content_offset: self.content_offset,
232            content_size_in_bytes: self.content_size_in_bytes,
233        })
234    }
235}
236
237#[serde_as]
238#[derive(Serialize, Deserialize)]
239#[cfg_attr(test, derive(Debug, PartialEq, Eq))]
240struct BytesEntry {
241    key: i32,
242    value: serde_bytes::ByteBuf,
243}
244
245fn parse_bytes_entry(v: Vec<BytesEntry>, schema: &Schema) -> Result<HashMap<i32, Datum>, Error> {
246    let mut m = HashMap::with_capacity(v.len());
247    for entry in v {
248        // First try to find the field in the schema, or check if it's a reserved metadata field
249        let field = schema
250            .field_by_id(entry.key)
251            .or_else(|| metadata_columns::get_metadata_field(entry.key).ok());
252
253        if let Some(field) = field {
254            let data_type = field
255                .field_type
256                .as_primitive_type()
257                .ok_or_else(|| {
258                    Error::new(
259                        ErrorKind::DataInvalid,
260                        format!("field {} is not a primitive type", field.name),
261                    )
262                })?
263                .clone();
264            m.insert(entry.key, Datum::try_from_bytes(&entry.value, data_type)?);
265        }
266        // We ignore the entry if the field is not found in schema or metadata columns (schema evolution).
267    }
268    Ok(m)
269}
270
271fn to_bytes_entry(v: impl IntoIterator<Item = (i32, Datum)>) -> Result<Vec<BytesEntry>, Error> {
272    let iter = v.into_iter();
273    // Reserve the capacity to the lower bound.
274    let mut bs = Vec::with_capacity(iter.size_hint().0);
275    for (k, d) in iter {
276        bs.push(BytesEntry {
277            key: k,
278            value: d.to_bytes()?,
279        });
280    }
281    Ok(bs)
282}
283
284#[derive(Serialize, Deserialize)]
285#[cfg_attr(test, derive(Debug, PartialEq, Eq))]
286struct I64Entry {
287    key: i32,
288    value: i64,
289}
290
291fn parse_i64_entry(v: Vec<I64Entry>) -> Result<HashMap<i32, u64>, Error> {
292    let mut m = HashMap::with_capacity(v.len());
293    for entry in v {
294        // We ignore the entry if it's value is negative since these entries are supposed to be used for
295        // counting, which should never be negative.
296        if let Ok(v) = entry.value.try_into() {
297            m.insert(entry.key, v);
298        }
299    }
300    Ok(m)
301}
302
303#[allow(unused_mut)]
304fn to_i64_entry(entries: HashMap<i32, u64>) -> Result<Vec<I64Entry>, Error> {
305    let mut i64_entries = entries
306        .iter()
307        .map(|e| {
308            Ok(I64Entry {
309                key: *e.0,
310                value: (*e.1).try_into()?,
311            })
312        })
313        .collect::<Result<Vec<_>, Error>>()?;
314
315    // Ensure that the order is deterministic during testing
316    #[cfg(test)]
317    i64_entries.sort_by_key(|e| e.key);
318
319    Ok(i64_entries)
320}
321
322#[cfg(test)]
323mod tests {
324    use std::collections::HashMap;
325    use std::io::Cursor;
326    use std::sync::Arc;
327
328    use crate::spec::manifest::_serde::{I64Entry, parse_i64_entry};
329    use crate::spec::{
330        DataContentType, DataFile, DataFileFormat, Datum, FormatVersion, NestedField,
331        PrimitiveType, Schema, Struct, StructType, Type, read_data_files_from_avro,
332        write_data_files_to_avro,
333    };
334
335    #[test]
336    fn test_parse_negative_manifest_entry() {
337        let entries = vec![I64Entry { key: 1, value: -1 }, I64Entry {
338            key: 2,
339            value: 3,
340        }];
341
342        let ret = parse_i64_entry(entries).unwrap();
343
344        let expected_ret = HashMap::from([(2, 3)]);
345        assert_eq!(ret, expected_ret, "Negative i64 entry should be ignored!");
346    }
347
348    fn schema() -> Arc<Schema> {
349        Arc::new(
350            Schema::builder()
351                .with_fields(vec![
352                    Arc::new(NestedField::optional(
353                        1,
354                        "v1",
355                        Type::Primitive(PrimitiveType::Int),
356                    )),
357                    Arc::new(NestedField::optional(
358                        2,
359                        "v2",
360                        Type::Primitive(PrimitiveType::String),
361                    )),
362                    Arc::new(NestedField::optional(
363                        3,
364                        "v3",
365                        Type::Primitive(PrimitiveType::String),
366                    )),
367                ])
368                .build()
369                .unwrap(),
370        )
371    }
372
373    fn data_files() -> Vec<DataFile> {
374        vec![DataFile {
375            content: DataContentType::Data,
376            file_path: "s3://testbucket/iceberg_data/iceberg_ctl/iceberg_db/iceberg_tbl/data/00000-7-45268d71-54eb-476c-b42c-942d880c04a1-00001.parquet".to_string(),
377            file_format: DataFileFormat::Parquet,
378            partition: Struct::empty(),
379            record_count: 1,
380            file_size_in_bytes: 875,
381            column_sizes: HashMap::from([(1,47),(2,48),(3,52)]),
382            value_counts: HashMap::from([(1,1),(2,1),(3,1)]),
383            null_value_counts: HashMap::from([(1,0),(2,0),(3,0)]),
384            nan_value_counts: HashMap::new(),
385            lower_bounds: HashMap::from([(1,Datum::int(1)),(2,Datum::string("a")),(3,Datum::string("AC/DC"))]),
386            upper_bounds: HashMap::from([(1,Datum::int(1)),(2,Datum::string("a")),(3,Datum::string("AC/DC"))]),
387            key_metadata: None,
388            split_offsets: Some(vec![4]),
389            equality_ids: None,
390            sort_order_id: Some(0),
391            partition_spec_id: 0,
392            first_row_id: None,
393            referenced_data_file: None,
394            content_offset: None,
395            content_size_in_bytes: None,
396        }]
397    }
398
399    #[tokio::test]
400    async fn test_data_file_serialize_deserialize() {
401        let schema = schema();
402        let data_files = data_files();
403
404        let mut buffer = Vec::new();
405        let _ = write_data_files_to_avro(
406            &mut buffer,
407            data_files.clone().into_iter(),
408            &StructType::new(vec![]),
409            FormatVersion::V2,
410        )
411        .unwrap();
412
413        let actual_data_file = read_data_files_from_avro(
414            &mut Cursor::new(buffer),
415            &schema,
416            0,
417            &StructType::new(vec![]),
418            FormatVersion::V2,
419        )
420        .unwrap();
421
422        assert_eq!(data_files, actual_data_file);
423    }
424
425    #[tokio::test]
426    async fn test_data_file_serialize_deserialize_v1_data_on_v2_reader() {
427        let schema = schema();
428        let data_files = data_files();
429
430        let mut buffer = Vec::new();
431        let _ = write_data_files_to_avro(
432            &mut buffer,
433            data_files.clone().into_iter(),
434            &StructType::new(vec![]),
435            FormatVersion::V1,
436        )
437        .unwrap();
438
439        let actual_data_file = read_data_files_from_avro(
440            &mut Cursor::new(buffer),
441            &schema,
442            0,
443            &StructType::new(vec![]),
444            FormatVersion::V2,
445        )
446        .unwrap();
447
448        assert_eq!(actual_data_file[0].content, DataContentType::Data)
449    }
450
451    #[test]
452    fn test_manifest_entry_v1_to_v2_projection() {
453        use crate::spec::manifest::_serde::{DataFileSerde, ManifestEntryV1};
454        use crate::spec::{Literal, RawLiteral, Struct, StructType};
455
456        let partition = RawLiteral::try_from(
457            Literal::Struct(Struct::empty()),
458            &Type::Struct(StructType::new(vec![])),
459        )
460        .unwrap();
461
462        // Create a V1 manifest entry struct (lacks V2 sequence number fields)
463        let v1_entry = ManifestEntryV1 {
464            status: 1, // Added
465            snapshot_id: 12345,
466            data_file: DataFileSerde {
467                content: 0, // DataFileSerde is shared between V1/V2
468                file_path: "test/path.parquet".to_string(),
469                file_format: "PARQUET".to_string(),
470                partition,
471                record_count: 100,
472                file_size_in_bytes: 1024,
473                block_size_in_bytes: Some(0), // V1 includes this field
474                column_sizes: None,
475                value_counts: None,
476                null_value_counts: None,
477                nan_value_counts: None,
478                lower_bounds: None,
479                upper_bounds: None,
480                key_metadata: None,
481                split_offsets: None,
482                equality_ids: None, // Will be converted to empty vec
483                sort_order_id: None,
484                first_row_id: None,
485                referenced_data_file: None,
486                content_offset: None,
487                content_size_in_bytes: None,
488            },
489        };
490
491        // Test the explicit V1→V2 conversion logic in ManifestEntryV1::try_into()
492        let v2_entry = v1_entry
493            .try_into(
494                0, // partition_spec_id
495                &StructType::new(vec![]),
496                &schema(),
497            )
498            .unwrap();
499
500        // Verify that V1→V2 conversion adds the missing V2 sequence number fields
501        assert_eq!(
502            v2_entry.sequence_number,
503            Some(0),
504            "ManifestEntryV1::try_into() should set sequence_number to 0"
505        );
506        assert_eq!(
507            v2_entry.file_sequence_number,
508            Some(0),
509            "ManifestEntryV1::try_into() should set file_sequence_number to 0"
510        );
511        assert_eq!(
512            v2_entry.snapshot_id,
513            Some(12345),
514            "snapshot_id should be preserved during conversion"
515        );
516
517        // Verify that DataFileSerde conversion applies V2 defaults
518        assert_eq!(
519            v2_entry.data_file.content,
520            DataContentType::Data,
521            "DataFileSerde should convert content 0 to DataContentType::Data"
522        );
523        assert_eq!(
524            v2_entry.data_file.equality_ids, None,
525            "DataFileSerde should preserve None equality_ids as None"
526        );
527
528        // Verify other fields are preserved during conversion
529        assert_eq!(v2_entry.data_file.file_path, "test/path.parquet");
530        assert_eq!(v2_entry.data_file.record_count, 100);
531        assert_eq!(v2_entry.data_file.file_size_in_bytes, 1024);
532    }
533
534    #[test]
535    fn test_data_file_serde_v1_field_defaults() {
536        use crate::spec::manifest::_serde::DataFileSerde;
537        use crate::spec::{Literal, RawLiteral, Struct, StructType};
538
539        let partition = RawLiteral::try_from(
540            Literal::Struct(Struct::empty()),
541            &Type::Struct(StructType::new(vec![])),
542        )
543        .unwrap();
544
545        // Create a DataFileSerde that simulates V1 deserialization behavior
546        // (missing V2 fields would be None due to #[serde(default)])
547        let v1_style_data_file = DataFileSerde {
548            content: 0, // V1 doesn't have this field, defaults to 0 via #[serde(default)]
549            file_path: "test/data.parquet".to_string(),
550            file_format: "PARQUET".to_string(),
551            partition,
552            record_count: 500,
553            file_size_in_bytes: 2048,
554            block_size_in_bytes: Some(1024), // V1 includes this field, V2 skips it
555            column_sizes: None,
556            value_counts: None,
557            null_value_counts: None,
558            nan_value_counts: None,
559            lower_bounds: None,
560            upper_bounds: None,
561            key_metadata: None,
562            split_offsets: None,
563            equality_ids: None, // V1 doesn't have this field, defaults to None via #[serde(default)]
564            sort_order_id: None,
565            first_row_id: None,
566            referenced_data_file: None,
567            content_offset: None,
568            content_size_in_bytes: None,
569        };
570
571        // Test the DataFileSerde::try_into() conversion that handles V1 field defaults
572        let data_file = v1_style_data_file
573            .try_into(
574                0, // partition_spec_id
575                &StructType::new(vec![]),
576                &schema(),
577            )
578            .unwrap();
579
580        // Verify that DataFileSerde::try_into() applies correct defaults for missing V2 fields
581        assert_eq!(
582            data_file.content,
583            DataContentType::Data,
584            "content 0 should convert to DataContentType::Data"
585        );
586        assert_eq!(
587            data_file.equality_ids, None,
588            "None equality_ids should remain as None"
589        );
590
591        // Verify other fields are handled correctly during conversion
592        assert_eq!(data_file.file_path, "test/data.parquet");
593        assert_eq!(data_file.file_format, DataFileFormat::Parquet);
594        assert_eq!(data_file.record_count, 500);
595        assert_eq!(data_file.file_size_in_bytes, 2048);
596        assert_eq!(data_file.partition_spec_id, 0);
597    }
598
599    #[test]
600    fn test_parse_bytes_entry_with_metadata_column() {
601        use crate::metadata_columns::RESERVED_FIELD_ID_POS;
602        use crate::spec::manifest::_serde::{BytesEntry, parse_bytes_entry};
603
604        // Create a minimal schema that doesn't include the _pos metadata column
605        let test_schema = schema();
606
607        // Create a BytesEntry with the _pos field ID (reserved metadata field)
608        // The _pos field is a Long (i64) in the metadata column definition
609        let pos_value: i64 = 42;
610        let bytes_entry = BytesEntry {
611            key: RESERVED_FIELD_ID_POS,
612            value: serde_bytes::ByteBuf::from(pos_value.to_le_bytes().to_vec()),
613        };
614
615        // Parse the bytes entry - should use metadata column definition for _pos
616        let result = parse_bytes_entry(vec![bytes_entry], &test_schema).unwrap();
617
618        // Verify that the _pos field was parsed correctly using metadata column definition
619        assert!(
620            result.contains_key(&RESERVED_FIELD_ID_POS),
621            "_pos metadata field should be parsed"
622        );
623        assert_eq!(
624            result.get(&RESERVED_FIELD_ID_POS),
625            Some(&Datum::long(pos_value)),
626            "_pos should be parsed as long with correct value"
627        );
628    }
629}