iceberg/spec/manifest/
entry.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::sync::Arc;
19
20use apache_avro::Schema as AvroSchema;
21use once_cell::sync::Lazy;
22use typed_builder::TypedBuilder;
23
24use crate::avro::schema_to_avro_schema;
25use crate::error::Result;
26use crate::spec::{
27    DataContentType, DataFile, INITIAL_SEQUENCE_NUMBER, ListType, Literal, ManifestFile, MapType,
28    NestedField, NestedFieldRef, PrimitiveLiteral, PrimitiveType, Schema, StructType, Type,
29};
30use crate::{Error, ErrorKind};
31
32/// Reference to [`ManifestEntry`].
33pub type ManifestEntryRef = Arc<ManifestEntry>;
34
35/// A manifest is an immutable Avro file that lists data files or delete
36/// files, along with each file’s partition data tuple, metrics, and tracking
37/// information.
38#[derive(Debug, PartialEq, Eq, Clone, TypedBuilder)]
39pub struct ManifestEntry {
40    /// field: 0
41    ///
42    /// Used to track additions and deletions.
43    pub status: ManifestStatus,
44    /// field id: 1
45    ///
46    /// Snapshot id where the file was added, or deleted if status is 2.
47    /// Inherited when null.
48    #[builder(default, setter(strip_option(fallback = snapshot_id_opt)))]
49    pub snapshot_id: Option<i64>,
50    /// field id: 3
51    ///
52    /// Data sequence number of the file.
53    /// Inherited when null and status is 1 (added).
54    #[builder(default, setter(strip_option(fallback = sequence_number_opt)))]
55    pub sequence_number: Option<i64>,
56    /// field id: 4
57    ///
58    /// File sequence number indicating when the file was added.
59    /// Inherited when null and status is 1 (added).
60    #[builder(default, setter(strip_option(fallback = file_sequence_number_opt)))]
61    pub file_sequence_number: Option<i64>,
62    /// field id: 2
63    ///
64    /// File path, partition tuple, metrics, …
65    pub data_file: DataFile,
66}
67
68impl ManifestEntry {
69    /// Check if this manifest entry is deleted.
70    pub fn is_alive(&self) -> bool {
71        matches!(
72            self.status,
73            ManifestStatus::Added | ManifestStatus::Existing
74        )
75    }
76
77    /// Status of this manifest entry
78    pub fn status(&self) -> ManifestStatus {
79        self.status
80    }
81
82    /// Content type of this manifest entry.
83    #[inline]
84    pub fn content_type(&self) -> DataContentType {
85        self.data_file.content
86    }
87
88    /// File format of this manifest entry.
89    #[inline]
90    pub fn file_format(&self) -> DataFileFormat {
91        self.data_file.file_format
92    }
93
94    /// Data file path of this manifest entry.
95    #[inline]
96    pub fn file_path(&self) -> &str {
97        &self.data_file.file_path
98    }
99
100    /// Data file record count of the manifest entry.
101    #[inline]
102    pub fn record_count(&self) -> u64 {
103        self.data_file.record_count
104    }
105
106    /// Inherit data from manifest list, such as snapshot id, sequence number.
107    pub(crate) fn inherit_data(&mut self, snapshot_entry: &ManifestFile) {
108        if self.snapshot_id.is_none() {
109            self.snapshot_id = Some(snapshot_entry.added_snapshot_id);
110        }
111
112        if self.sequence_number.is_none()
113            && (self.status == ManifestStatus::Added
114                || snapshot_entry.sequence_number == INITIAL_SEQUENCE_NUMBER)
115        {
116            self.sequence_number = Some(snapshot_entry.sequence_number);
117        }
118
119        if self.file_sequence_number.is_none()
120            && (self.status == ManifestStatus::Added
121                || snapshot_entry.sequence_number == INITIAL_SEQUENCE_NUMBER)
122        {
123            self.file_sequence_number = Some(snapshot_entry.sequence_number);
124        }
125    }
126
127    /// Snapshot id
128    #[inline]
129    pub fn snapshot_id(&self) -> Option<i64> {
130        self.snapshot_id
131    }
132
133    /// Data sequence number.
134    #[inline]
135    pub fn sequence_number(&self) -> Option<i64> {
136        self.sequence_number
137    }
138
139    /// File size in bytes.
140    #[inline]
141    pub fn file_size_in_bytes(&self) -> u64 {
142        self.data_file.file_size_in_bytes
143    }
144
145    /// get a reference to the actual data file
146    #[inline]
147    pub fn data_file(&self) -> &DataFile {
148        &self.data_file
149    }
150}
151
152/// Used to track additions and deletions in ManifestEntry.
153#[derive(Debug, PartialEq, Eq, Clone, Copy)]
154pub enum ManifestStatus {
155    /// Value: 0
156    Existing = 0,
157    /// Value: 1
158    Added = 1,
159    /// Value: 2
160    ///
161    /// Deletes are informational only and not used in scans.
162    Deleted = 2,
163}
164
165impl TryFrom<i32> for ManifestStatus {
166    type Error = Error;
167
168    fn try_from(v: i32) -> Result<ManifestStatus> {
169        match v {
170            0 => Ok(ManifestStatus::Existing),
171            1 => Ok(ManifestStatus::Added),
172            2 => Ok(ManifestStatus::Deleted),
173            _ => Err(Error::new(
174                ErrorKind::DataInvalid,
175                format!("manifest status {v} is invalid"),
176            )),
177        }
178    }
179}
180
181use super::DataFileFormat;
182
183static STATUS: Lazy<NestedFieldRef> = {
184    Lazy::new(|| {
185        Arc::new(NestedField::required(
186            0,
187            "status",
188            Type::Primitive(PrimitiveType::Int),
189        ))
190    })
191};
192
193static SNAPSHOT_ID_V1: Lazy<NestedFieldRef> = {
194    Lazy::new(|| {
195        Arc::new(NestedField::required(
196            1,
197            "snapshot_id",
198            Type::Primitive(PrimitiveType::Long),
199        ))
200    })
201};
202
203static SNAPSHOT_ID_V2: Lazy<NestedFieldRef> = {
204    Lazy::new(|| {
205        Arc::new(NestedField::optional(
206            1,
207            "snapshot_id",
208            Type::Primitive(PrimitiveType::Long),
209        ))
210    })
211};
212
213static SEQUENCE_NUMBER: Lazy<NestedFieldRef> = {
214    Lazy::new(|| {
215        Arc::new(NestedField::optional(
216            3,
217            "sequence_number",
218            Type::Primitive(PrimitiveType::Long),
219        ))
220    })
221};
222
223static FILE_SEQUENCE_NUMBER: Lazy<NestedFieldRef> = {
224    Lazy::new(|| {
225        Arc::new(NestedField::optional(
226            4,
227            "file_sequence_number",
228            Type::Primitive(PrimitiveType::Long),
229        ))
230    })
231};
232
233static CONTENT: Lazy<NestedFieldRef> = {
234    Lazy::new(|| {
235        Arc::new(
236            NestedField::required(134, "content", Type::Primitive(PrimitiveType::Int))
237                // 0 refers to DataContentType::DATA
238                .with_initial_default(Literal::Primitive(PrimitiveLiteral::Int(0))),
239        )
240    })
241};
242
243static FILE_PATH: Lazy<NestedFieldRef> = {
244    Lazy::new(|| {
245        Arc::new(NestedField::required(
246            100,
247            "file_path",
248            Type::Primitive(PrimitiveType::String),
249        ))
250    })
251};
252
253static FILE_FORMAT: Lazy<NestedFieldRef> = {
254    Lazy::new(|| {
255        Arc::new(NestedField::required(
256            101,
257            "file_format",
258            Type::Primitive(PrimitiveType::String),
259        ))
260    })
261};
262
263static RECORD_COUNT: Lazy<NestedFieldRef> = {
264    Lazy::new(|| {
265        Arc::new(NestedField::required(
266            103,
267            "record_count",
268            Type::Primitive(PrimitiveType::Long),
269        ))
270    })
271};
272
273static FILE_SIZE_IN_BYTES: Lazy<NestedFieldRef> = {
274    Lazy::new(|| {
275        Arc::new(NestedField::required(
276            104,
277            "file_size_in_bytes",
278            Type::Primitive(PrimitiveType::Long),
279        ))
280    })
281};
282
283// Deprecated. Always write a default in v1. Do not write in v2.
284static BLOCK_SIZE_IN_BYTES: Lazy<NestedFieldRef> = {
285    Lazy::new(|| {
286        Arc::new(NestedField::required(
287            105,
288            "block_size_in_bytes",
289            Type::Primitive(PrimitiveType::Long),
290        ))
291    })
292};
293
294static COLUMN_SIZES: Lazy<NestedFieldRef> = {
295    Lazy::new(|| {
296        Arc::new(NestedField::optional(
297            108,
298            "column_sizes",
299            Type::Map(MapType {
300                key_field: Arc::new(NestedField::required(
301                    117,
302                    "key",
303                    Type::Primitive(PrimitiveType::Int),
304                )),
305                value_field: Arc::new(NestedField::required(
306                    118,
307                    "value",
308                    Type::Primitive(PrimitiveType::Long),
309                )),
310            }),
311        ))
312    })
313};
314
315static VALUE_COUNTS: Lazy<NestedFieldRef> = {
316    Lazy::new(|| {
317        Arc::new(NestedField::optional(
318            109,
319            "value_counts",
320            Type::Map(MapType {
321                key_field: Arc::new(NestedField::required(
322                    119,
323                    "key",
324                    Type::Primitive(PrimitiveType::Int),
325                )),
326                value_field: Arc::new(NestedField::required(
327                    120,
328                    "value",
329                    Type::Primitive(PrimitiveType::Long),
330                )),
331            }),
332        ))
333    })
334};
335
336static NULL_VALUE_COUNTS: Lazy<NestedFieldRef> = {
337    Lazy::new(|| {
338        Arc::new(NestedField::optional(
339            110,
340            "null_value_counts",
341            Type::Map(MapType {
342                key_field: Arc::new(NestedField::required(
343                    121,
344                    "key",
345                    Type::Primitive(PrimitiveType::Int),
346                )),
347                value_field: Arc::new(NestedField::required(
348                    122,
349                    "value",
350                    Type::Primitive(PrimitiveType::Long),
351                )),
352            }),
353        ))
354    })
355};
356
357static NAN_VALUE_COUNTS: Lazy<NestedFieldRef> = {
358    Lazy::new(|| {
359        Arc::new(NestedField::optional(
360            137,
361            "nan_value_counts",
362            Type::Map(MapType {
363                key_field: Arc::new(NestedField::required(
364                    138,
365                    "key",
366                    Type::Primitive(PrimitiveType::Int),
367                )),
368                value_field: Arc::new(NestedField::required(
369                    139,
370                    "value",
371                    Type::Primitive(PrimitiveType::Long),
372                )),
373            }),
374        ))
375    })
376};
377
378static LOWER_BOUNDS: Lazy<NestedFieldRef> = {
379    Lazy::new(|| {
380        Arc::new(NestedField::optional(
381            125,
382            "lower_bounds",
383            Type::Map(MapType {
384                key_field: Arc::new(NestedField::required(
385                    126,
386                    "key",
387                    Type::Primitive(PrimitiveType::Int),
388                )),
389                value_field: Arc::new(NestedField::required(
390                    127,
391                    "value",
392                    Type::Primitive(PrimitiveType::Binary),
393                )),
394            }),
395        ))
396    })
397};
398
399static UPPER_BOUNDS: Lazy<NestedFieldRef> = {
400    Lazy::new(|| {
401        Arc::new(NestedField::optional(
402            128,
403            "upper_bounds",
404            Type::Map(MapType {
405                key_field: Arc::new(NestedField::required(
406                    129,
407                    "key",
408                    Type::Primitive(PrimitiveType::Int),
409                )),
410                value_field: Arc::new(NestedField::required(
411                    130,
412                    "value",
413                    Type::Primitive(PrimitiveType::Binary),
414                )),
415            }),
416        ))
417    })
418};
419
420static KEY_METADATA: Lazy<NestedFieldRef> = {
421    Lazy::new(|| {
422        Arc::new(NestedField::optional(
423            131,
424            "key_metadata",
425            Type::Primitive(PrimitiveType::Binary),
426        ))
427    })
428};
429
430static SPLIT_OFFSETS: Lazy<NestedFieldRef> = {
431    Lazy::new(|| {
432        Arc::new(NestedField::optional(
433            132,
434            "split_offsets",
435            Type::List(ListType {
436                element_field: Arc::new(NestedField::required(
437                    133,
438                    "element",
439                    Type::Primitive(PrimitiveType::Long),
440                )),
441            }),
442        ))
443    })
444};
445
446static EQUALITY_IDS: Lazy<NestedFieldRef> = {
447    Lazy::new(|| {
448        Arc::new(NestedField::optional(
449            135,
450            "equality_ids",
451            Type::List(ListType {
452                element_field: Arc::new(NestedField::required(
453                    136,
454                    "element",
455                    Type::Primitive(PrimitiveType::Int),
456                )),
457            }),
458        ))
459    })
460};
461
462static SORT_ORDER_ID: Lazy<NestedFieldRef> = {
463    Lazy::new(|| {
464        Arc::new(NestedField::optional(
465            140,
466            "sort_order_id",
467            Type::Primitive(PrimitiveType::Int),
468        ))
469    })
470};
471
472static FIRST_ROW_ID: Lazy<NestedFieldRef> = {
473    Lazy::new(|| {
474        Arc::new(NestedField::optional(
475            142,
476            "first_row_id",
477            Type::Primitive(PrimitiveType::Long),
478        ))
479    })
480};
481
482static REFERENCE_DATA_FILE: Lazy<NestedFieldRef> = {
483    Lazy::new(|| {
484        Arc::new(NestedField::optional(
485            143,
486            "referenced_data_file",
487            Type::Primitive(PrimitiveType::String),
488        ))
489    })
490};
491
492static CONTENT_OFFSET: Lazy<NestedFieldRef> = {
493    Lazy::new(|| {
494        Arc::new(NestedField::optional(
495            144,
496            "content_offset",
497            Type::Primitive(PrimitiveType::Long),
498        ))
499    })
500};
501
502static CONTENT_SIZE_IN_BYTES: Lazy<NestedFieldRef> = {
503    Lazy::new(|| {
504        Arc::new(NestedField::optional(
505            145,
506            "content_size_in_bytes",
507            Type::Primitive(PrimitiveType::Long),
508        ))
509    })
510};
511
512fn data_file_fields_v3(partition_type: &StructType) -> Vec<NestedFieldRef> {
513    vec![
514        CONTENT.clone(),
515        FILE_PATH.clone(),
516        FILE_FORMAT.clone(),
517        Arc::new(NestedField::required(
518            102,
519            "partition",
520            Type::Struct(partition_type.clone()),
521        )),
522        RECORD_COUNT.clone(),
523        FILE_SIZE_IN_BYTES.clone(),
524        COLUMN_SIZES.clone(),
525        VALUE_COUNTS.clone(),
526        NULL_VALUE_COUNTS.clone(),
527        NAN_VALUE_COUNTS.clone(),
528        LOWER_BOUNDS.clone(),
529        UPPER_BOUNDS.clone(),
530        KEY_METADATA.clone(),
531        SPLIT_OFFSETS.clone(),
532        EQUALITY_IDS.clone(),
533        SORT_ORDER_ID.clone(),
534        FIRST_ROW_ID.clone(),
535        REFERENCE_DATA_FILE.clone(),
536        CONTENT_OFFSET.clone(),
537        CONTENT_SIZE_IN_BYTES.clone(),
538    ]
539}
540
541pub(super) fn data_file_schema_v3(partition_type: &StructType) -> Result<AvroSchema> {
542    let schema = Schema::builder()
543        .with_fields(data_file_fields_v3(partition_type))
544        .build()?;
545    schema_to_avro_schema("data_file", &schema)
546}
547
548fn data_file_fields_v2(partition_type: &StructType) -> Vec<NestedFieldRef> {
549    vec![
550        CONTENT.clone(),
551        FILE_PATH.clone(),
552        FILE_FORMAT.clone(),
553        Arc::new(NestedField::required(
554            102,
555            "partition",
556            Type::Struct(partition_type.clone()),
557        )),
558        RECORD_COUNT.clone(),
559        FILE_SIZE_IN_BYTES.clone(),
560        COLUMN_SIZES.clone(),
561        VALUE_COUNTS.clone(),
562        NULL_VALUE_COUNTS.clone(),
563        NAN_VALUE_COUNTS.clone(),
564        LOWER_BOUNDS.clone(),
565        UPPER_BOUNDS.clone(),
566        KEY_METADATA.clone(),
567        SPLIT_OFFSETS.clone(),
568        EQUALITY_IDS.clone(),
569        SORT_ORDER_ID.clone(),
570        FIRST_ROW_ID.clone(),
571        REFERENCE_DATA_FILE.clone(),
572        // Why are the following two fields here in the existing v2 schema?
573        // In the spec, they are not even listed as optional for v2.
574        CONTENT_OFFSET.clone(),
575        CONTENT_SIZE_IN_BYTES.clone(),
576    ]
577}
578
579pub(super) fn data_file_schema_v2(partition_type: &StructType) -> Result<AvroSchema> {
580    let schema = Schema::builder()
581        .with_fields(data_file_fields_v2(partition_type))
582        .build()?;
583    schema_to_avro_schema("data_file", &schema)
584}
585
586pub(super) fn manifest_schema_v2(partition_type: &StructType) -> Result<AvroSchema> {
587    let fields = vec![
588        STATUS.clone(),
589        SNAPSHOT_ID_V2.clone(),
590        SEQUENCE_NUMBER.clone(),
591        FILE_SEQUENCE_NUMBER.clone(),
592        Arc::new(NestedField::required(
593            2,
594            "data_file",
595            Type::Struct(StructType::new(data_file_fields_v2(partition_type))),
596        )),
597    ];
598    let schema = Schema::builder().with_fields(fields).build()?;
599    schema_to_avro_schema("manifest_entry", &schema)
600}
601
602fn data_file_fields_v1(partition_type: &StructType) -> Vec<NestedFieldRef> {
603    vec![
604        FILE_PATH.clone(),
605        FILE_FORMAT.clone(),
606        Arc::new(NestedField::required(
607            102,
608            "partition",
609            Type::Struct(partition_type.clone()),
610        )),
611        RECORD_COUNT.clone(),
612        FILE_SIZE_IN_BYTES.clone(),
613        BLOCK_SIZE_IN_BYTES.clone(),
614        COLUMN_SIZES.clone(),
615        VALUE_COUNTS.clone(),
616        NULL_VALUE_COUNTS.clone(),
617        NAN_VALUE_COUNTS.clone(),
618        LOWER_BOUNDS.clone(),
619        UPPER_BOUNDS.clone(),
620        KEY_METADATA.clone(),
621        SPLIT_OFFSETS.clone(),
622        SORT_ORDER_ID.clone(),
623    ]
624}
625
626pub(super) fn data_file_schema_v1(partition_type: &StructType) -> Result<AvroSchema> {
627    let schema = Schema::builder()
628        .with_fields(data_file_fields_v1(partition_type))
629        .build()?;
630    schema_to_avro_schema("data_file", &schema)
631}
632
633pub(super) fn manifest_schema_v1(partition_type: &StructType) -> Result<AvroSchema> {
634    let fields = vec![
635        STATUS.clone(),
636        SNAPSHOT_ID_V1.clone(),
637        Arc::new(NestedField::required(
638            2,
639            "data_file",
640            Type::Struct(StructType::new(data_file_fields_v1(partition_type))),
641        )),
642    ];
643    let schema = Schema::builder().with_fields(fields).build()?;
644    schema_to_avro_schema("manifest_entry", &schema)
645}