iceberg/spec/manifest/
data_file.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::collections::HashMap;
19use std::io::{Read, Write};
20use std::str::FromStr;
21
22use apache_avro::{Reader as AvroReader, Writer as AvroWriter, from_value, to_value};
23use serde_derive::{Deserialize, Serialize};
24use serde_with::{DeserializeFromStr, SerializeDisplay};
25
26use super::_serde::DataFileSerde;
27use super::{
28    Datum, FormatVersion, Schema, data_file_schema_v1, data_file_schema_v2, data_file_schema_v3,
29};
30use crate::error::Result;
31use crate::spec::{DEFAULT_PARTITION_SPEC_ID, Struct, StructType};
32use crate::{Error, ErrorKind};
33
34/// Data file carries data file path, partition tuple, metrics, …
35#[derive(Debug, PartialEq, Clone, Eq, Builder)]
36pub struct DataFile {
37    /// field id: 134
38    ///
39    /// Type of content stored by the data file: data, equality deletes,
40    /// or position deletes (all v1 files are data files)
41    pub(crate) content: DataContentType,
42    /// field id: 100
43    ///
44    /// Full URI for the file with FS scheme
45    pub(crate) file_path: String,
46    /// field id: 101
47    ///
48    /// String file format name, `avro`, `orc`, `parquet`, or `puffin`
49    pub(crate) file_format: DataFileFormat,
50    /// field id: 102
51    ///
52    /// Partition data tuple, schema based on the partition spec output using
53    /// partition field ids for the struct field ids
54    #[builder(default = "Struct::empty()")]
55    pub(crate) partition: Struct,
56    /// field id: 103
57    ///
58    /// Number of records in this file, or the cardinality of a deletion vector
59    pub(crate) record_count: u64,
60    /// field id: 104
61    ///
62    /// Total file size in bytes
63    pub(crate) file_size_in_bytes: u64,
64    /// field id: 108
65    /// key field id: 117
66    /// value field id: 118
67    ///
68    /// Map from column id to the total size on disk of all regions that
69    /// store the column. Does not include bytes necessary to read other
70    /// columns, like footers. Leave null for row-oriented formats (Avro)
71    #[builder(default)]
72    pub(crate) column_sizes: HashMap<i32, u64>,
73    /// field id: 109
74    /// key field id: 119
75    /// value field id: 120
76    ///
77    /// Map from column id to number of values in the column (including null
78    /// and NaN values)
79    #[builder(default)]
80    pub(crate) value_counts: HashMap<i32, u64>,
81    /// field id: 110
82    /// key field id: 121
83    /// value field id: 122
84    ///
85    /// Map from column id to number of null values in the column
86    #[builder(default)]
87    pub(crate) null_value_counts: HashMap<i32, u64>,
88    /// field id: 137
89    /// key field id: 138
90    /// value field id: 139
91    ///
92    /// Map from column id to number of NaN values in the column
93    #[builder(default)]
94    pub(crate) nan_value_counts: HashMap<i32, u64>,
95    /// field id: 125
96    /// key field id: 126
97    /// value field id: 127
98    ///
99    /// Map from column id to lower bound in the column serialized as binary.
100    /// Each value must be less than or equal to all non-null, non-NaN values
101    /// in the column for the file.
102    ///
103    /// Reference:
104    ///
105    /// - [Binary single-value serialization](https://iceberg.apache.org/spec/#binary-single-value-serialization)
106    #[builder(default)]
107    pub(crate) lower_bounds: HashMap<i32, Datum>,
108    /// field id: 128
109    /// key field id: 129
110    /// value field id: 130
111    ///
112    /// Map from column id to upper bound in the column serialized as binary.
113    /// Each value must be greater than or equal to all non-null, non-Nan
114    /// values in the column for the file.
115    ///
116    /// Reference:
117    ///
118    /// - [Binary single-value serialization](https://iceberg.apache.org/spec/#binary-single-value-serialization)
119    #[builder(default)]
120    pub(crate) upper_bounds: HashMap<i32, Datum>,
121    /// field id: 131
122    ///
123    /// Implementation-specific key metadata for encryption
124    #[builder(default)]
125    pub(crate) key_metadata: Option<Vec<u8>>,
126    /// field id: 132
127    /// element field id: 133
128    ///
129    /// Split offsets for the data file. For example, all row group offsets
130    /// in a Parquet file. Must be sorted ascending
131    #[builder(default)]
132    pub(crate) split_offsets: Vec<i64>,
133    /// field id: 135
134    /// element field id: 136
135    ///
136    /// Field ids used to determine row equality in equality delete files.
137    /// Required when content is EqualityDeletes and should be null
138    /// otherwise. Fields with ids listed in this column must be present
139    /// in the delete file
140    #[builder(default)]
141    pub(crate) equality_ids: Option<Vec<i32>>,
142    /// field id: 140
143    ///
144    /// ID representing sort order for this file.
145    ///
146    /// If sort order ID is missing or unknown, then the order is assumed to
147    /// be unsorted. Only data files and equality delete files should be
148    /// written with a non-null order id. Position deletes are required to be
149    /// sorted by file and position, not a table order, and should set sort
150    /// order id to null. Readers must ignore sort order id for position
151    /// delete files.
152    #[builder(default, setter(strip_option))]
153    pub(crate) sort_order_id: Option<i32>,
154    /// field id: 142
155    ///
156    /// The _row_id for the first row in the data file.
157    /// For more details, refer to https://github.com/apache/iceberg/blob/main/format/spec.md#first-row-id-inheritance
158    #[builder(default)]
159    pub(crate) first_row_id: Option<i64>,
160    /// This field is not included in spec. It is just store in memory representation used
161    /// in process.
162    #[builder(default = "DEFAULT_PARTITION_SPEC_ID")]
163    pub(crate) partition_spec_id: i32,
164    /// field id: 143
165    ///
166    /// Fully qualified location (URI with FS scheme) of a data file that all deletes reference.
167    /// Position delete metadata can use `referenced_data_file` when all deletes tracked by the
168    /// entry are in a single data file. Setting the referenced file is required for deletion vectors.
169    #[builder(default)]
170    pub(crate) referenced_data_file: Option<String>,
171    /// field: 144
172    ///
173    /// The offset in the file where the content starts.
174    /// The `content_offset` and `content_size_in_bytes` fields are used to reference a specific blob
175    /// for direct access to a deletion vector. For deletion vectors, these values are required and must
176    /// exactly match the `offset` and `length` stored in the Puffin footer for the deletion vector blob.
177    #[builder(default)]
178    pub(crate) content_offset: Option<i64>,
179    /// field: 145
180    ///
181    /// The length of a referenced content stored in the file; required if `content_offset` is present
182    #[builder(default)]
183    pub(crate) content_size_in_bytes: Option<i64>,
184}
185
186impl DataFile {
187    /// Get the content type of the data file (data, equality deletes, or position deletes)
188    pub fn content_type(&self) -> DataContentType {
189        self.content
190    }
191    /// Get the file path as full URI with FS scheme
192    pub fn file_path(&self) -> &str {
193        &self.file_path
194    }
195    /// Get the file format of the file (avro, orc or parquet).
196    pub fn file_format(&self) -> DataFileFormat {
197        self.file_format
198    }
199    /// Get the partition values of the file.
200    pub fn partition(&self) -> &Struct {
201        &self.partition
202    }
203    /// Get the record count in the data file.
204    pub fn record_count(&self) -> u64 {
205        self.record_count
206    }
207    /// Get the file size in bytes.
208    pub fn file_size_in_bytes(&self) -> u64 {
209        self.file_size_in_bytes
210    }
211    /// Get the column sizes.
212    /// Map from column id to the total size on disk of all regions that
213    /// store the column. Does not include bytes necessary to read other
214    /// columns, like footers. Null for row-oriented formats (Avro)
215    pub fn column_sizes(&self) -> &HashMap<i32, u64> {
216        &self.column_sizes
217    }
218    /// Get the columns value counts for the data file.
219    /// Map from column id to number of values in the column (including null
220    /// and NaN values)
221    pub fn value_counts(&self) -> &HashMap<i32, u64> {
222        &self.value_counts
223    }
224    /// Get the null value counts of the data file.
225    /// Map from column id to number of null values in the column
226    pub fn null_value_counts(&self) -> &HashMap<i32, u64> {
227        &self.null_value_counts
228    }
229    /// Get the nan value counts of the data file.
230    /// Map from column id to number of NaN values in the column
231    pub fn nan_value_counts(&self) -> &HashMap<i32, u64> {
232        &self.nan_value_counts
233    }
234    /// Get the lower bounds of the data file values per column.
235    /// Map from column id to lower bound in the column serialized as binary.
236    pub fn lower_bounds(&self) -> &HashMap<i32, Datum> {
237        &self.lower_bounds
238    }
239    /// Get the upper bounds of the data file values per column.
240    /// Map from column id to upper bound in the column serialized as binary.
241    pub fn upper_bounds(&self) -> &HashMap<i32, Datum> {
242        &self.upper_bounds
243    }
244    /// Get the Implementation-specific key metadata for the data file.
245    pub fn key_metadata(&self) -> Option<&[u8]> {
246        self.key_metadata.as_deref()
247    }
248    /// Get the split offsets of the data file.
249    /// For example, all row group offsets in a Parquet file.
250    pub fn split_offsets(&self) -> &[i64] {
251        &self.split_offsets
252    }
253    /// Get the equality ids of the data file.
254    /// Field ids used to determine row equality in equality delete files.
255    /// null when content is not EqualityDeletes.
256    pub fn equality_ids(&self) -> Option<Vec<i32>> {
257        self.equality_ids.clone()
258    }
259    /// Get the first row id in the data file.
260    pub fn first_row_id(&self) -> Option<i64> {
261        self.first_row_id
262    }
263    /// Get the sort order id of the data file.
264    /// Only data files and equality delete files should be
265    /// written with a non-null order id. Position deletes are required to be
266    /// sorted by file and position, not a table order, and should set sort
267    /// order id to null. Readers must ignore sort order id for position
268    /// delete files.
269    pub fn sort_order_id(&self) -> Option<i32> {
270        self.sort_order_id
271    }
272    /// Get the fully qualified referenced location for the corresponding data file.
273    /// Positional delete files could have the field set, and deletion vectors must the field set.
274    pub fn referenced_data_file(&self) -> Option<String> {
275        self.referenced_data_file.clone()
276    }
277    /// Get the offset in the file where the blob content starts.
278    /// Only meaningful for puffin blobs, and required for deletion vectors.
279    pub fn content_offset(&self) -> Option<i64> {
280        self.content_offset
281    }
282    /// Get the length of a puffin blob.
283    /// Only meaningful for puffin blobs, and required for deletion vectors.
284    pub fn content_size_in_bytes(&self) -> Option<i64> {
285        self.content_size_in_bytes
286    }
287}
288
289/// Convert data files to avro bytes and write to writer.
290/// Return the bytes written.
291pub fn write_data_files_to_avro<W: Write>(
292    writer: &mut W,
293    data_files: impl IntoIterator<Item = DataFile>,
294    partition_type: &StructType,
295    version: FormatVersion,
296) -> Result<usize> {
297    let avro_schema = match version {
298        FormatVersion::V1 => data_file_schema_v1(partition_type).unwrap(),
299        FormatVersion::V2 => data_file_schema_v2(partition_type).unwrap(),
300        FormatVersion::V3 => data_file_schema_v3(partition_type).unwrap(),
301    };
302    let mut writer = AvroWriter::new(&avro_schema, writer);
303
304    for data_file in data_files {
305        let value = to_value(DataFileSerde::try_from(
306            data_file,
307            partition_type,
308            FormatVersion::V1,
309        )?)?
310        .resolve(&avro_schema)?;
311        writer.append(value)?;
312    }
313
314    Ok(writer.flush()?)
315}
316
317/// Parse data files from avro bytes.
318pub fn read_data_files_from_avro<R: Read>(
319    reader: &mut R,
320    schema: &Schema,
321    partition_spec_id: i32,
322    partition_type: &StructType,
323    version: FormatVersion,
324) -> Result<Vec<DataFile>> {
325    let avro_schema = match version {
326        FormatVersion::V1 => data_file_schema_v1(partition_type).unwrap(),
327        FormatVersion::V2 => data_file_schema_v2(partition_type).unwrap(),
328        FormatVersion::V3 => data_file_schema_v3(partition_type).unwrap(),
329    };
330
331    let reader = AvroReader::with_schema(&avro_schema, reader)?;
332    reader
333        .into_iter()
334        .map(|value| {
335            from_value::<DataFileSerde>(&value?)?.try_into(
336                partition_spec_id,
337                partition_type,
338                schema,
339            )
340        })
341        .collect::<Result<Vec<_>>>()
342}
343
344/// Type of content stored by the data file: data, equality deletes, or
345/// position deletes (all v1 files are data files)
346#[derive(Debug, PartialEq, Eq, Clone, Copy, Serialize, Deserialize, Default)]
347pub enum DataContentType {
348    /// value: 0
349    #[default]
350    Data = 0,
351    /// value: 1
352    PositionDeletes = 1,
353    /// value: 2
354    EqualityDeletes = 2,
355}
356
357impl TryFrom<i32> for DataContentType {
358    type Error = Error;
359
360    fn try_from(v: i32) -> Result<DataContentType> {
361        match v {
362            0 => Ok(DataContentType::Data),
363            1 => Ok(DataContentType::PositionDeletes),
364            2 => Ok(DataContentType::EqualityDeletes),
365            _ => Err(Error::new(
366                ErrorKind::DataInvalid,
367                format!("data content type {v} is invalid"),
368            )),
369        }
370    }
371}
372
373/// Format of this data.
374#[derive(Debug, PartialEq, Eq, Clone, Copy, SerializeDisplay, DeserializeFromStr)]
375pub enum DataFileFormat {
376    /// Avro file format: <https://avro.apache.org/>
377    Avro,
378    /// Orc file format: <https://orc.apache.org/>
379    Orc,
380    /// Parquet file format: <https://parquet.apache.org/>
381    Parquet,
382    /// Puffin file format: <https://iceberg.apache.org/puffin-spec/>
383    Puffin,
384}
385
386impl FromStr for DataFileFormat {
387    type Err = Error;
388
389    fn from_str(s: &str) -> Result<Self> {
390        match s.to_lowercase().as_str() {
391            "avro" => Ok(Self::Avro),
392            "orc" => Ok(Self::Orc),
393            "parquet" => Ok(Self::Parquet),
394            "puffin" => Ok(Self::Puffin),
395            _ => Err(Error::new(
396                ErrorKind::DataInvalid,
397                format!("Unsupported data file format: {s}"),
398            )),
399        }
400    }
401}
402
403impl std::fmt::Display for DataFileFormat {
404    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
405        match self {
406            DataFileFormat::Avro => write!(f, "avro"),
407            DataFileFormat::Orc => write!(f, "orc"),
408            DataFileFormat::Parquet => write!(f, "parquet"),
409            DataFileFormat::Puffin => write!(f, "puffin"),
410        }
411    }
412}
413
414#[cfg(test)]
415mod test {
416    use crate::spec::DataContentType;
417    #[test]
418    fn test_data_content_type_default() {
419        assert_eq!(DataContentType::default(), DataContentType::Data);
420    }
421
422    #[test]
423    fn test_data_content_type_default_value() {
424        assert_eq!(DataContentType::default() as i32, 0);
425    }
426}