iceberg/spec/manifest/data_file.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::collections::HashMap;
19use std::io::{Read, Write};
20use std::str::FromStr;
21
22use apache_avro::{Reader as AvroReader, Writer as AvroWriter, from_value, to_value};
23use serde_derive::{Deserialize, Serialize};
24use serde_with::{DeserializeFromStr, SerializeDisplay};
25
26use super::_serde::DataFileSerde;
27use super::{
28 Datum, FormatVersion, Schema, data_file_schema_v1, data_file_schema_v2, data_file_schema_v3,
29};
30use crate::error::Result;
31use crate::spec::{DEFAULT_PARTITION_SPEC_ID, Struct, StructType};
32use crate::{Error, ErrorKind};
33
34/// Data file carries data file path, partition tuple, metrics, …
35#[derive(Debug, PartialEq, Clone, Eq, Builder)]
36pub struct DataFile {
37 /// field id: 134
38 ///
39 /// Type of content stored by the data file: data, equality deletes,
40 /// or position deletes (all v1 files are data files)
41 pub(crate) content: DataContentType,
42 /// field id: 100
43 ///
44 /// Full URI for the file with FS scheme
45 pub(crate) file_path: String,
46 /// field id: 101
47 ///
48 /// String file format name, `avro`, `orc`, `parquet`, or `puffin`
49 pub(crate) file_format: DataFileFormat,
50 /// field id: 102
51 ///
52 /// Partition data tuple, schema based on the partition spec output using
53 /// partition field ids for the struct field ids
54 #[builder(default = "Struct::empty()")]
55 pub(crate) partition: Struct,
56 /// field id: 103
57 ///
58 /// Number of records in this file, or the cardinality of a deletion vector
59 pub(crate) record_count: u64,
60 /// field id: 104
61 ///
62 /// Total file size in bytes
63 pub(crate) file_size_in_bytes: u64,
64 /// field id: 108
65 /// key field id: 117
66 /// value field id: 118
67 ///
68 /// Map from column id to the total size on disk of all regions that
69 /// store the column. Does not include bytes necessary to read other
70 /// columns, like footers. Leave null for row-oriented formats (Avro)
71 #[builder(default)]
72 pub(crate) column_sizes: HashMap<i32, u64>,
73 /// field id: 109
74 /// key field id: 119
75 /// value field id: 120
76 ///
77 /// Map from column id to number of values in the column (including null
78 /// and NaN values)
79 #[builder(default)]
80 pub(crate) value_counts: HashMap<i32, u64>,
81 /// field id: 110
82 /// key field id: 121
83 /// value field id: 122
84 ///
85 /// Map from column id to number of null values in the column
86 #[builder(default)]
87 pub(crate) null_value_counts: HashMap<i32, u64>,
88 /// field id: 137
89 /// key field id: 138
90 /// value field id: 139
91 ///
92 /// Map from column id to number of NaN values in the column
93 #[builder(default)]
94 pub(crate) nan_value_counts: HashMap<i32, u64>,
95 /// field id: 125
96 /// key field id: 126
97 /// value field id: 127
98 ///
99 /// Map from column id to lower bound in the column serialized as binary.
100 /// Each value must be less than or equal to all non-null, non-NaN values
101 /// in the column for the file.
102 ///
103 /// Reference:
104 ///
105 /// - [Binary single-value serialization](https://iceberg.apache.org/spec/#binary-single-value-serialization)
106 #[builder(default)]
107 pub(crate) lower_bounds: HashMap<i32, Datum>,
108 /// field id: 128
109 /// key field id: 129
110 /// value field id: 130
111 ///
112 /// Map from column id to upper bound in the column serialized as binary.
113 /// Each value must be greater than or equal to all non-null, non-Nan
114 /// values in the column for the file.
115 ///
116 /// Reference:
117 ///
118 /// - [Binary single-value serialization](https://iceberg.apache.org/spec/#binary-single-value-serialization)
119 #[builder(default)]
120 pub(crate) upper_bounds: HashMap<i32, Datum>,
121 /// field id: 131
122 ///
123 /// Implementation-specific key metadata for encryption
124 #[builder(default)]
125 pub(crate) key_metadata: Option<Vec<u8>>,
126 /// field id: 132
127 /// element field id: 133
128 ///
129 /// Split offsets for the data file. For example, all row group offsets
130 /// in a Parquet file. Must be sorted ascending
131 #[builder(default)]
132 pub(crate) split_offsets: Vec<i64>,
133 /// field id: 135
134 /// element field id: 136
135 ///
136 /// Field ids used to determine row equality in equality delete files.
137 /// Required when content is EqualityDeletes and should be null
138 /// otherwise. Fields with ids listed in this column must be present
139 /// in the delete file
140 #[builder(default)]
141 pub(crate) equality_ids: Option<Vec<i32>>,
142 /// field id: 140
143 ///
144 /// ID representing sort order for this file.
145 ///
146 /// If sort order ID is missing or unknown, then the order is assumed to
147 /// be unsorted. Only data files and equality delete files should be
148 /// written with a non-null order id. Position deletes are required to be
149 /// sorted by file and position, not a table order, and should set sort
150 /// order id to null. Readers must ignore sort order id for position
151 /// delete files.
152 #[builder(default, setter(strip_option))]
153 pub(crate) sort_order_id: Option<i32>,
154 /// field id: 142
155 ///
156 /// The _row_id for the first row in the data file.
157 /// For more details, refer to https://github.com/apache/iceberg/blob/main/format/spec.md#first-row-id-inheritance
158 #[builder(default)]
159 pub(crate) first_row_id: Option<i64>,
160 /// This field is not included in spec. It is just store in memory representation used
161 /// in process.
162 #[builder(default = "DEFAULT_PARTITION_SPEC_ID")]
163 pub(crate) partition_spec_id: i32,
164 /// field id: 143
165 ///
166 /// Fully qualified location (URI with FS scheme) of a data file that all deletes reference.
167 /// Position delete metadata can use `referenced_data_file` when all deletes tracked by the
168 /// entry are in a single data file. Setting the referenced file is required for deletion vectors.
169 #[builder(default)]
170 pub(crate) referenced_data_file: Option<String>,
171 /// field: 144
172 ///
173 /// The offset in the file where the content starts.
174 /// The `content_offset` and `content_size_in_bytes` fields are used to reference a specific blob
175 /// for direct access to a deletion vector. For deletion vectors, these values are required and must
176 /// exactly match the `offset` and `length` stored in the Puffin footer for the deletion vector blob.
177 #[builder(default)]
178 pub(crate) content_offset: Option<i64>,
179 /// field: 145
180 ///
181 /// The length of a referenced content stored in the file; required if `content_offset` is present
182 #[builder(default)]
183 pub(crate) content_size_in_bytes: Option<i64>,
184}
185
186impl DataFile {
187 /// Get the content type of the data file (data, equality deletes, or position deletes)
188 pub fn content_type(&self) -> DataContentType {
189 self.content
190 }
191 /// Get the file path as full URI with FS scheme
192 pub fn file_path(&self) -> &str {
193 &self.file_path
194 }
195 /// Get the file format of the file (avro, orc or parquet).
196 pub fn file_format(&self) -> DataFileFormat {
197 self.file_format
198 }
199 /// Get the partition values of the file.
200 pub fn partition(&self) -> &Struct {
201 &self.partition
202 }
203 /// Get the record count in the data file.
204 pub fn record_count(&self) -> u64 {
205 self.record_count
206 }
207 /// Get the file size in bytes.
208 pub fn file_size_in_bytes(&self) -> u64 {
209 self.file_size_in_bytes
210 }
211 /// Get the column sizes.
212 /// Map from column id to the total size on disk of all regions that
213 /// store the column. Does not include bytes necessary to read other
214 /// columns, like footers. Null for row-oriented formats (Avro)
215 pub fn column_sizes(&self) -> &HashMap<i32, u64> {
216 &self.column_sizes
217 }
218 /// Get the columns value counts for the data file.
219 /// Map from column id to number of values in the column (including null
220 /// and NaN values)
221 pub fn value_counts(&self) -> &HashMap<i32, u64> {
222 &self.value_counts
223 }
224 /// Get the null value counts of the data file.
225 /// Map from column id to number of null values in the column
226 pub fn null_value_counts(&self) -> &HashMap<i32, u64> {
227 &self.null_value_counts
228 }
229 /// Get the nan value counts of the data file.
230 /// Map from column id to number of NaN values in the column
231 pub fn nan_value_counts(&self) -> &HashMap<i32, u64> {
232 &self.nan_value_counts
233 }
234 /// Get the lower bounds of the data file values per column.
235 /// Map from column id to lower bound in the column serialized as binary.
236 pub fn lower_bounds(&self) -> &HashMap<i32, Datum> {
237 &self.lower_bounds
238 }
239 /// Get the upper bounds of the data file values per column.
240 /// Map from column id to upper bound in the column serialized as binary.
241 pub fn upper_bounds(&self) -> &HashMap<i32, Datum> {
242 &self.upper_bounds
243 }
244 /// Get the Implementation-specific key metadata for the data file.
245 pub fn key_metadata(&self) -> Option<&[u8]> {
246 self.key_metadata.as_deref()
247 }
248 /// Get the split offsets of the data file.
249 /// For example, all row group offsets in a Parquet file.
250 pub fn split_offsets(&self) -> &[i64] {
251 &self.split_offsets
252 }
253 /// Get the equality ids of the data file.
254 /// Field ids used to determine row equality in equality delete files.
255 /// null when content is not EqualityDeletes.
256 pub fn equality_ids(&self) -> Option<Vec<i32>> {
257 self.equality_ids.clone()
258 }
259 /// Get the first row id in the data file.
260 pub fn first_row_id(&self) -> Option<i64> {
261 self.first_row_id
262 }
263 /// Get the sort order id of the data file.
264 /// Only data files and equality delete files should be
265 /// written with a non-null order id. Position deletes are required to be
266 /// sorted by file and position, not a table order, and should set sort
267 /// order id to null. Readers must ignore sort order id for position
268 /// delete files.
269 pub fn sort_order_id(&self) -> Option<i32> {
270 self.sort_order_id
271 }
272 /// Get the fully qualified referenced location for the corresponding data file.
273 /// Positional delete files could have the field set, and deletion vectors must the field set.
274 pub fn referenced_data_file(&self) -> Option<String> {
275 self.referenced_data_file.clone()
276 }
277 /// Get the offset in the file where the blob content starts.
278 /// Only meaningful for puffin blobs, and required for deletion vectors.
279 pub fn content_offset(&self) -> Option<i64> {
280 self.content_offset
281 }
282 /// Get the length of a puffin blob.
283 /// Only meaningful for puffin blobs, and required for deletion vectors.
284 pub fn content_size_in_bytes(&self) -> Option<i64> {
285 self.content_size_in_bytes
286 }
287}
288
289/// Convert data files to avro bytes and write to writer.
290/// Return the bytes written.
291pub fn write_data_files_to_avro<W: Write>(
292 writer: &mut W,
293 data_files: impl IntoIterator<Item = DataFile>,
294 partition_type: &StructType,
295 version: FormatVersion,
296) -> Result<usize> {
297 let avro_schema = match version {
298 FormatVersion::V1 => data_file_schema_v1(partition_type).unwrap(),
299 FormatVersion::V2 => data_file_schema_v2(partition_type).unwrap(),
300 FormatVersion::V3 => data_file_schema_v3(partition_type).unwrap(),
301 };
302 let mut writer = AvroWriter::new(&avro_schema, writer);
303
304 for data_file in data_files {
305 let value = to_value(DataFileSerde::try_from(
306 data_file,
307 partition_type,
308 FormatVersion::V1,
309 )?)?
310 .resolve(&avro_schema)?;
311 writer.append(value)?;
312 }
313
314 Ok(writer.flush()?)
315}
316
317/// Parse data files from avro bytes.
318pub fn read_data_files_from_avro<R: Read>(
319 reader: &mut R,
320 schema: &Schema,
321 partition_spec_id: i32,
322 partition_type: &StructType,
323 version: FormatVersion,
324) -> Result<Vec<DataFile>> {
325 let avro_schema = match version {
326 FormatVersion::V1 => data_file_schema_v1(partition_type).unwrap(),
327 FormatVersion::V2 => data_file_schema_v2(partition_type).unwrap(),
328 FormatVersion::V3 => data_file_schema_v3(partition_type).unwrap(),
329 };
330
331 let reader = AvroReader::with_schema(&avro_schema, reader)?;
332 reader
333 .into_iter()
334 .map(|value| {
335 from_value::<DataFileSerde>(&value?)?.try_into(
336 partition_spec_id,
337 partition_type,
338 schema,
339 )
340 })
341 .collect::<Result<Vec<_>>>()
342}
343
344/// Type of content stored by the data file: data, equality deletes, or
345/// position deletes (all v1 files are data files)
346#[derive(Debug, PartialEq, Eq, Clone, Copy, Serialize, Deserialize, Default)]
347pub enum DataContentType {
348 /// value: 0
349 #[default]
350 Data = 0,
351 /// value: 1
352 PositionDeletes = 1,
353 /// value: 2
354 EqualityDeletes = 2,
355}
356
357impl TryFrom<i32> for DataContentType {
358 type Error = Error;
359
360 fn try_from(v: i32) -> Result<DataContentType> {
361 match v {
362 0 => Ok(DataContentType::Data),
363 1 => Ok(DataContentType::PositionDeletes),
364 2 => Ok(DataContentType::EqualityDeletes),
365 _ => Err(Error::new(
366 ErrorKind::DataInvalid,
367 format!("data content type {v} is invalid"),
368 )),
369 }
370 }
371}
372
373/// Format of this data.
374#[derive(Debug, PartialEq, Eq, Clone, Copy, SerializeDisplay, DeserializeFromStr)]
375pub enum DataFileFormat {
376 /// Avro file format: <https://avro.apache.org/>
377 Avro,
378 /// Orc file format: <https://orc.apache.org/>
379 Orc,
380 /// Parquet file format: <https://parquet.apache.org/>
381 Parquet,
382 /// Puffin file format: <https://iceberg.apache.org/puffin-spec/>
383 Puffin,
384}
385
386impl FromStr for DataFileFormat {
387 type Err = Error;
388
389 fn from_str(s: &str) -> Result<Self> {
390 match s.to_lowercase().as_str() {
391 "avro" => Ok(Self::Avro),
392 "orc" => Ok(Self::Orc),
393 "parquet" => Ok(Self::Parquet),
394 "puffin" => Ok(Self::Puffin),
395 _ => Err(Error::new(
396 ErrorKind::DataInvalid,
397 format!("Unsupported data file format: {s}"),
398 )),
399 }
400 }
401}
402
403impl std::fmt::Display for DataFileFormat {
404 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
405 match self {
406 DataFileFormat::Avro => write!(f, "avro"),
407 DataFileFormat::Orc => write!(f, "orc"),
408 DataFileFormat::Parquet => write!(f, "parquet"),
409 DataFileFormat::Puffin => write!(f, "puffin"),
410 }
411 }
412}
413
414#[cfg(test)]
415mod test {
416 use crate::spec::DataContentType;
417 #[test]
418 fn test_data_content_type_default() {
419 assert_eq!(DataContentType::default(), DataContentType::Data);
420 }
421
422 #[test]
423 fn test_data_content_type_default_value() {
424 assert_eq!(DataContentType::default() as i32, 0);
425 }
426}