iceberg/spec/manifest_list/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! ManifestList for Iceberg.
19
20mod _const_schema;
21pub(super) mod _serde;
22mod manifest_file;
23mod reader;
24mod writer;
25
26use apache_avro::types::Value;
27use apache_avro::{Reader, from_value};
28pub use manifest_file::*;
29pub use reader::*;
30pub use serde_bytes::ByteBuf;
31pub use writer::*;
32
33use self::_const_schema::MANIFEST_LIST_AVRO_SCHEMA_V1;
34use super::FormatVersion;
35use crate::error::Result;
36
37/// Placeholder for sequence number. The field with this value must be replaced with the actual sequence number before it write.
38pub const UNASSIGNED_SEQUENCE_NUMBER: i64 = -1;
39
40/// Snapshots are embedded in table metadata, but the list of manifests for a
41/// snapshot are stored in a separate manifest list file.
42///
43/// A new manifest list is written for each attempt to commit a snapshot
44/// because the list of manifests always changes to produce a new snapshot.
45/// When a manifest list is written, the (optimistic) sequence number of the
46/// snapshot is written for all new manifest files tracked by the list.
47///
48/// A manifest list includes summary metadata that can be used to avoid
49/// scanning all of the manifests in a snapshot when planning a table scan.
50/// This includes the number of added, existing, and deleted files, and a
51/// summary of values for each field of the partition spec used to write the
52/// manifest.
53#[derive(Debug, Clone, PartialEq)]
54pub struct ManifestList {
55    /// Entries in a manifest list.
56    entries: Vec<ManifestFile>,
57}
58
59impl ManifestList {
60    /// Parse manifest list from bytes.
61    pub fn parse_with_version(bs: &[u8], version: FormatVersion) -> Result<ManifestList> {
62        match version {
63            FormatVersion::V1 => {
64                let reader = Reader::with_schema(&MANIFEST_LIST_AVRO_SCHEMA_V1, bs)?;
65                let values = Value::Array(reader.collect::<std::result::Result<Vec<Value>, _>>()?);
66                from_value::<_serde::ManifestListV1>(&values)?.try_into()
67            }
68            FormatVersion::V2 => {
69                let reader = Reader::new(bs)?;
70                let values = Value::Array(reader.collect::<std::result::Result<Vec<Value>, _>>()?);
71                from_value::<_serde::ManifestListV2>(&values)?.try_into()
72            }
73            FormatVersion::V3 => {
74                let reader = Reader::new(bs)?;
75                let values = Value::Array(reader.collect::<std::result::Result<Vec<Value>, _>>()?);
76                from_value::<_serde::ManifestListV3>(&values)?.try_into()
77            }
78        }
79    }
80
81    /// Get the entries in the manifest list.
82    pub fn entries(&self) -> &[ManifestFile] {
83        &self.entries
84    }
85
86    /// Take ownership of the entries in the manifest list, consuming it
87    pub fn consume_entries(self) -> impl IntoIterator<Item = ManifestFile> {
88        Box::new(self.entries.into_iter())
89    }
90}
91
92#[cfg(test)]
93mod test {
94    use std::fs;
95
96    use apache_avro::{Codec, Writer};
97    use tempfile::TempDir;
98
99    use super::_const_schema::MANIFEST_LIST_AVRO_SCHEMA_V2;
100    use super::_serde::ManifestFileV2;
101    use super::*;
102    use crate::io::FileIO;
103    use crate::spec::{Datum, FieldSummary, ManifestContentType, ManifestFile};
104
105    #[tokio::test]
106    async fn test_parse_manifest_list_v1() {
107        let manifest_list = ManifestList {
108            entries: vec![
109                ManifestFile {
110                    manifest_path: "/opt/bitnami/spark/warehouse/db/table/metadata/10d28031-9739-484c-92db-cdf2975cead4-m0.avro".to_string(),
111                    manifest_length: 5806,
112                    partition_spec_id: 0,
113                    content: ManifestContentType::Data,
114                    sequence_number: 0,
115                    min_sequence_number: 0,
116                    added_snapshot_id: 1646658105718557341,
117                    added_files_count: Some(3),
118                    existing_files_count: Some(0),
119                    deleted_files_count: Some(0),
120                    added_rows_count: Some(3),
121                    existing_rows_count: Some(0),
122                    deleted_rows_count: Some(0),
123                    partitions: Some(vec![]),
124                    key_metadata: None,
125                    first_row_id: None,
126                }
127            ]
128        };
129
130        let file_io = FileIO::new_with_fs();
131
132        let tmp_dir = TempDir::new().unwrap();
133        let file_name = "simple_manifest_list_v1.avro";
134        let full_path = format!("{}/{}", tmp_dir.path().to_str().unwrap(), file_name);
135
136        let mut writer = ManifestListWriter::v1(
137            file_io
138                .new_output(full_path.clone())
139                .unwrap()
140                .writer()
141                .await
142                .unwrap(),
143            1646658105718557341,
144            Some(1646658105718557341),
145        );
146
147        writer
148            .add_manifests(manifest_list.entries.clone().into_iter())
149            .unwrap();
150        writer.close().await.unwrap();
151
152        let bs = fs::read(full_path).expect("read_file must succeed");
153
154        let parsed_manifest_list =
155            ManifestList::parse_with_version(&bs, crate::spec::FormatVersion::V1).unwrap();
156
157        assert_eq!(manifest_list, parsed_manifest_list);
158    }
159
160    #[tokio::test]
161    async fn test_parse_manifest_list_v2() {
162        let manifest_list = ManifestList {
163            entries: vec![
164                ManifestFile {
165                    manifest_path: "s3a://icebergdata/demo/s1/t1/metadata/05ffe08b-810f-49b3-a8f4-e88fc99b254a-m0.avro".to_string(),
166                    manifest_length: 6926,
167                    partition_spec_id: 1,
168                    content: ManifestContentType::Data,
169                    sequence_number: 1,
170                    min_sequence_number: 1,
171                    added_snapshot_id: 377075049360453639,
172                    added_files_count: Some(1),
173                    existing_files_count: Some(0),
174                    deleted_files_count: Some(0),
175                    added_rows_count: Some(3),
176                    existing_rows_count: Some(0),
177                    deleted_rows_count: Some(0),
178                    partitions: Some(
179                        vec![FieldSummary { contains_null: false, contains_nan: Some(false), lower_bound: Some(Datum::long(1).to_bytes().unwrap()), upper_bound: Some(Datum::long(1).to_bytes().unwrap())}]
180                    ),
181                    key_metadata: None,
182                    first_row_id: None,
183                },
184                ManifestFile {
185                    manifest_path: "s3a://icebergdata/demo/s1/t1/metadata/05ffe08b-810f-49b3-a8f4-e88fc99b254a-m1.avro".to_string(),
186                    manifest_length: 6926,
187                    partition_spec_id: 2,
188                    content: ManifestContentType::Data,
189                    sequence_number: 1,
190                    min_sequence_number: 1,
191                    added_snapshot_id: 377075049360453639,
192                    added_files_count: Some(1),
193                    existing_files_count: Some(0),
194                    deleted_files_count: Some(0),
195                    added_rows_count: Some(3),
196                    existing_rows_count: Some(0),
197                    deleted_rows_count: Some(0),
198                    partitions: Some(
199                        vec![FieldSummary { contains_null: false, contains_nan: Some(false), lower_bound: Some(Datum::float(1.1).to_bytes().unwrap()), upper_bound: Some(Datum::float(2.1).to_bytes().unwrap())}]
200                    ),
201                    key_metadata: None,
202                    first_row_id: None,
203                }
204            ]
205        };
206
207        let file_io = FileIO::new_with_fs();
208
209        let tmp_dir = TempDir::new().unwrap();
210        let file_name = "simple_manifest_list_v1.avro";
211        let full_path = format!("{}/{}", tmp_dir.path().to_str().unwrap(), file_name);
212
213        let mut writer = ManifestListWriter::v2(
214            file_io
215                .new_output(full_path.clone())
216                .unwrap()
217                .writer()
218                .await
219                .unwrap(),
220            1646658105718557341,
221            Some(1646658105718557341),
222            1,
223        );
224
225        writer
226            .add_manifests(manifest_list.entries.clone().into_iter())
227            .unwrap();
228        writer.close().await.unwrap();
229
230        let bs = fs::read(full_path).expect("read_file must succeed");
231
232        let parsed_manifest_list =
233            ManifestList::parse_with_version(&bs, crate::spec::FormatVersion::V2).unwrap();
234
235        assert_eq!(manifest_list, parsed_manifest_list);
236    }
237
238    #[test]
239    fn test_parse_snappy_manifest_list_v2() {
240        let manifest_list = ManifestList {
241            entries: vec![ManifestFile {
242                manifest_path: "s3a://icebergdata/demo/s1/t1/metadata/snappy-m0.avro".to_string(),
243                manifest_length: 6926,
244                partition_spec_id: 1,
245                content: ManifestContentType::Data,
246                sequence_number: 1,
247                min_sequence_number: 1,
248                added_snapshot_id: 377075049360453639,
249                added_files_count: Some(1),
250                existing_files_count: Some(0),
251                deleted_files_count: Some(0),
252                added_rows_count: Some(3),
253                existing_rows_count: Some(0),
254                deleted_rows_count: Some(0),
255                partitions: Some(vec![FieldSummary {
256                    contains_null: false,
257                    contains_nan: Some(false),
258                    lower_bound: Some(Datum::long(1).to_bytes().unwrap()),
259                    upper_bound: Some(Datum::long(1).to_bytes().unwrap()),
260                }]),
261                key_metadata: None,
262                first_row_id: None,
263            }],
264        };
265
266        let manifest_entry: ManifestFileV2 = manifest_list.entries[0].clone().try_into().unwrap();
267        let mut writer =
268            Writer::with_codec(&MANIFEST_LIST_AVRO_SCHEMA_V2, Vec::new(), Codec::Snappy);
269        writer.append_ser(manifest_entry).unwrap();
270        let bs = writer.into_inner().unwrap();
271
272        let parsed_manifest_list =
273            ManifestList::parse_with_version(&bs, crate::spec::FormatVersion::V2).unwrap();
274
275        assert_eq!(manifest_list, parsed_manifest_list);
276    }
277
278    #[tokio::test]
279    async fn test_parse_manifest_list_v3() {
280        let manifest_list = ManifestList {
281            entries: vec![
282                ManifestFile {
283                    manifest_path: "s3a://icebergdata/demo/s1/t1/metadata/05ffe08b-810f-49b3-a8f4-e88fc99b254a-m0.avro".to_string(),
284                    manifest_length: 6926,
285                    partition_spec_id: 1,
286                    content: ManifestContentType::Data,
287                    sequence_number: 1,
288                    min_sequence_number: 1,
289                    added_snapshot_id: 377075049360453639,
290                    added_files_count: Some(1),
291                    existing_files_count: Some(0),
292                    deleted_files_count: Some(0),
293                    added_rows_count: Some(3),
294                    existing_rows_count: Some(0),
295                    deleted_rows_count: Some(0),
296                    partitions: Some(
297                        vec![FieldSummary { contains_null: false, contains_nan: Some(false), lower_bound: Some(Datum::long(1).to_bytes().unwrap()), upper_bound: Some(Datum::long(1).to_bytes().unwrap())}]
298                    ),
299                    key_metadata: None,
300                    first_row_id: Some(10),
301                },
302                ManifestFile {
303                    manifest_path: "s3a://icebergdata/demo/s1/t1/metadata/05ffe08b-810f-49b3-a8f4-e88fc99b254a-m1.avro".to_string(),
304                    manifest_length: 6926,
305                    partition_spec_id: 2,
306                    content: ManifestContentType::Data,
307                    sequence_number: 1,
308                    min_sequence_number: 1,
309                    added_snapshot_id: 377075049360453639,
310                    added_files_count: Some(1),
311                    existing_files_count: Some(0),
312                    deleted_files_count: Some(0),
313                    added_rows_count: Some(3),
314                    existing_rows_count: Some(0),
315                    deleted_rows_count: Some(0),
316                    partitions: Some(
317                        vec![FieldSummary { contains_null: false, contains_nan: Some(false), lower_bound: Some(Datum::float(1.1).to_bytes().unwrap()), upper_bound: Some(Datum::float(2.1).to_bytes().unwrap())}]
318                    ),
319                    key_metadata: None,
320                    first_row_id: Some(13),
321                }
322            ]
323        };
324
325        let file_io = FileIO::new_with_fs();
326
327        let tmp_dir = TempDir::new().unwrap();
328        let file_name = "simple_manifest_list_v3.avro";
329        let full_path = format!("{}/{}", tmp_dir.path().to_str().unwrap(), file_name);
330
331        let mut writer = ManifestListWriter::v3(
332            file_io
333                .new_output(full_path.clone())
334                .unwrap()
335                .writer()
336                .await
337                .unwrap(),
338            377075049360453639,
339            Some(377075049360453639),
340            1,
341            Some(10),
342        );
343
344        writer
345            .add_manifests(manifest_list.entries.clone().into_iter())
346            .unwrap();
347        writer.close().await.unwrap();
348
349        let bs = fs::read(full_path).expect("read_file must succeed");
350
351        let parsed_manifest_list =
352            ManifestList::parse_with_version(&bs, crate::spec::FormatVersion::V3).unwrap();
353
354        assert_eq!(manifest_list, parsed_manifest_list);
355    }
356}