iceberg/spec/manifest_list/
mod.rs1mod _const_schema;
21pub(super) mod _serde;
22mod manifest_file;
23mod reader;
24mod writer;
25
26use apache_avro::types::Value;
27use apache_avro::{Reader, from_value};
28pub use manifest_file::*;
29pub use reader::*;
30pub use serde_bytes::ByteBuf;
31pub use writer::*;
32
33use self::_const_schema::MANIFEST_LIST_AVRO_SCHEMA_V1;
34use super::FormatVersion;
35use crate::error::Result;
36
37pub const UNASSIGNED_SEQUENCE_NUMBER: i64 = -1;
39
40#[derive(Debug, Clone, PartialEq)]
54pub struct ManifestList {
55 entries: Vec<ManifestFile>,
57}
58
59impl ManifestList {
60 pub fn parse_with_version(bs: &[u8], version: FormatVersion) -> Result<ManifestList> {
62 match version {
63 FormatVersion::V1 => {
64 let reader = Reader::with_schema(&MANIFEST_LIST_AVRO_SCHEMA_V1, bs)?;
65 let values = Value::Array(reader.collect::<std::result::Result<Vec<Value>, _>>()?);
66 from_value::<_serde::ManifestListV1>(&values)?.try_into()
67 }
68 FormatVersion::V2 => {
69 let reader = Reader::new(bs)?;
70 let values = Value::Array(reader.collect::<std::result::Result<Vec<Value>, _>>()?);
71 from_value::<_serde::ManifestListV2>(&values)?.try_into()
72 }
73 FormatVersion::V3 => {
74 let reader = Reader::new(bs)?;
75 let values = Value::Array(reader.collect::<std::result::Result<Vec<Value>, _>>()?);
76 from_value::<_serde::ManifestListV3>(&values)?.try_into()
77 }
78 }
79 }
80
81 pub fn entries(&self) -> &[ManifestFile] {
83 &self.entries
84 }
85
86 pub fn consume_entries(self) -> impl IntoIterator<Item = ManifestFile> {
88 Box::new(self.entries.into_iter())
89 }
90}
91
92#[cfg(test)]
93mod test {
94 use std::fs;
95
96 use apache_avro::{Codec, Writer};
97 use tempfile::TempDir;
98
99 use super::_const_schema::MANIFEST_LIST_AVRO_SCHEMA_V2;
100 use super::_serde::ManifestFileV2;
101 use super::*;
102 use crate::io::FileIO;
103 use crate::spec::{Datum, FieldSummary, ManifestContentType, ManifestFile};
104
105 #[tokio::test]
106 async fn test_parse_manifest_list_v1() {
107 let manifest_list = ManifestList {
108 entries: vec![
109 ManifestFile {
110 manifest_path: "/opt/bitnami/spark/warehouse/db/table/metadata/10d28031-9739-484c-92db-cdf2975cead4-m0.avro".to_string(),
111 manifest_length: 5806,
112 partition_spec_id: 0,
113 content: ManifestContentType::Data,
114 sequence_number: 0,
115 min_sequence_number: 0,
116 added_snapshot_id: 1646658105718557341,
117 added_files_count: Some(3),
118 existing_files_count: Some(0),
119 deleted_files_count: Some(0),
120 added_rows_count: Some(3),
121 existing_rows_count: Some(0),
122 deleted_rows_count: Some(0),
123 partitions: Some(vec![]),
124 key_metadata: None,
125 first_row_id: None,
126 }
127 ]
128 };
129
130 let file_io = FileIO::new_with_fs();
131
132 let tmp_dir = TempDir::new().unwrap();
133 let file_name = "simple_manifest_list_v1.avro";
134 let full_path = format!("{}/{}", tmp_dir.path().to_str().unwrap(), file_name);
135
136 let mut writer = ManifestListWriter::v1(
137 file_io
138 .new_output(full_path.clone())
139 .unwrap()
140 .writer()
141 .await
142 .unwrap(),
143 1646658105718557341,
144 Some(1646658105718557341),
145 );
146
147 writer
148 .add_manifests(manifest_list.entries.clone().into_iter())
149 .unwrap();
150 writer.close().await.unwrap();
151
152 let bs = fs::read(full_path).expect("read_file must succeed");
153
154 let parsed_manifest_list =
155 ManifestList::parse_with_version(&bs, crate::spec::FormatVersion::V1).unwrap();
156
157 assert_eq!(manifest_list, parsed_manifest_list);
158 }
159
160 #[tokio::test]
161 async fn test_parse_manifest_list_v2() {
162 let manifest_list = ManifestList {
163 entries: vec![
164 ManifestFile {
165 manifest_path: "s3a://icebergdata/demo/s1/t1/metadata/05ffe08b-810f-49b3-a8f4-e88fc99b254a-m0.avro".to_string(),
166 manifest_length: 6926,
167 partition_spec_id: 1,
168 content: ManifestContentType::Data,
169 sequence_number: 1,
170 min_sequence_number: 1,
171 added_snapshot_id: 377075049360453639,
172 added_files_count: Some(1),
173 existing_files_count: Some(0),
174 deleted_files_count: Some(0),
175 added_rows_count: Some(3),
176 existing_rows_count: Some(0),
177 deleted_rows_count: Some(0),
178 partitions: Some(
179 vec![FieldSummary { contains_null: false, contains_nan: Some(false), lower_bound: Some(Datum::long(1).to_bytes().unwrap()), upper_bound: Some(Datum::long(1).to_bytes().unwrap())}]
180 ),
181 key_metadata: None,
182 first_row_id: None,
183 },
184 ManifestFile {
185 manifest_path: "s3a://icebergdata/demo/s1/t1/metadata/05ffe08b-810f-49b3-a8f4-e88fc99b254a-m1.avro".to_string(),
186 manifest_length: 6926,
187 partition_spec_id: 2,
188 content: ManifestContentType::Data,
189 sequence_number: 1,
190 min_sequence_number: 1,
191 added_snapshot_id: 377075049360453639,
192 added_files_count: Some(1),
193 existing_files_count: Some(0),
194 deleted_files_count: Some(0),
195 added_rows_count: Some(3),
196 existing_rows_count: Some(0),
197 deleted_rows_count: Some(0),
198 partitions: Some(
199 vec![FieldSummary { contains_null: false, contains_nan: Some(false), lower_bound: Some(Datum::float(1.1).to_bytes().unwrap()), upper_bound: Some(Datum::float(2.1).to_bytes().unwrap())}]
200 ),
201 key_metadata: None,
202 first_row_id: None,
203 }
204 ]
205 };
206
207 let file_io = FileIO::new_with_fs();
208
209 let tmp_dir = TempDir::new().unwrap();
210 let file_name = "simple_manifest_list_v1.avro";
211 let full_path = format!("{}/{}", tmp_dir.path().to_str().unwrap(), file_name);
212
213 let mut writer = ManifestListWriter::v2(
214 file_io
215 .new_output(full_path.clone())
216 .unwrap()
217 .writer()
218 .await
219 .unwrap(),
220 1646658105718557341,
221 Some(1646658105718557341),
222 1,
223 );
224
225 writer
226 .add_manifests(manifest_list.entries.clone().into_iter())
227 .unwrap();
228 writer.close().await.unwrap();
229
230 let bs = fs::read(full_path).expect("read_file must succeed");
231
232 let parsed_manifest_list =
233 ManifestList::parse_with_version(&bs, crate::spec::FormatVersion::V2).unwrap();
234
235 assert_eq!(manifest_list, parsed_manifest_list);
236 }
237
238 #[test]
239 fn test_parse_snappy_manifest_list_v2() {
240 let manifest_list = ManifestList {
241 entries: vec![ManifestFile {
242 manifest_path: "s3a://icebergdata/demo/s1/t1/metadata/snappy-m0.avro".to_string(),
243 manifest_length: 6926,
244 partition_spec_id: 1,
245 content: ManifestContentType::Data,
246 sequence_number: 1,
247 min_sequence_number: 1,
248 added_snapshot_id: 377075049360453639,
249 added_files_count: Some(1),
250 existing_files_count: Some(0),
251 deleted_files_count: Some(0),
252 added_rows_count: Some(3),
253 existing_rows_count: Some(0),
254 deleted_rows_count: Some(0),
255 partitions: Some(vec![FieldSummary {
256 contains_null: false,
257 contains_nan: Some(false),
258 lower_bound: Some(Datum::long(1).to_bytes().unwrap()),
259 upper_bound: Some(Datum::long(1).to_bytes().unwrap()),
260 }]),
261 key_metadata: None,
262 first_row_id: None,
263 }],
264 };
265
266 let manifest_entry: ManifestFileV2 = manifest_list.entries[0].clone().try_into().unwrap();
267 let mut writer =
268 Writer::with_codec(&MANIFEST_LIST_AVRO_SCHEMA_V2, Vec::new(), Codec::Snappy);
269 writer.append_ser(manifest_entry).unwrap();
270 let bs = writer.into_inner().unwrap();
271
272 let parsed_manifest_list =
273 ManifestList::parse_with_version(&bs, crate::spec::FormatVersion::V2).unwrap();
274
275 assert_eq!(manifest_list, parsed_manifest_list);
276 }
277
278 #[tokio::test]
279 async fn test_parse_manifest_list_v3() {
280 let manifest_list = ManifestList {
281 entries: vec![
282 ManifestFile {
283 manifest_path: "s3a://icebergdata/demo/s1/t1/metadata/05ffe08b-810f-49b3-a8f4-e88fc99b254a-m0.avro".to_string(),
284 manifest_length: 6926,
285 partition_spec_id: 1,
286 content: ManifestContentType::Data,
287 sequence_number: 1,
288 min_sequence_number: 1,
289 added_snapshot_id: 377075049360453639,
290 added_files_count: Some(1),
291 existing_files_count: Some(0),
292 deleted_files_count: Some(0),
293 added_rows_count: Some(3),
294 existing_rows_count: Some(0),
295 deleted_rows_count: Some(0),
296 partitions: Some(
297 vec![FieldSummary { contains_null: false, contains_nan: Some(false), lower_bound: Some(Datum::long(1).to_bytes().unwrap()), upper_bound: Some(Datum::long(1).to_bytes().unwrap())}]
298 ),
299 key_metadata: None,
300 first_row_id: Some(10),
301 },
302 ManifestFile {
303 manifest_path: "s3a://icebergdata/demo/s1/t1/metadata/05ffe08b-810f-49b3-a8f4-e88fc99b254a-m1.avro".to_string(),
304 manifest_length: 6926,
305 partition_spec_id: 2,
306 content: ManifestContentType::Data,
307 sequence_number: 1,
308 min_sequence_number: 1,
309 added_snapshot_id: 377075049360453639,
310 added_files_count: Some(1),
311 existing_files_count: Some(0),
312 deleted_files_count: Some(0),
313 added_rows_count: Some(3),
314 existing_rows_count: Some(0),
315 deleted_rows_count: Some(0),
316 partitions: Some(
317 vec![FieldSummary { contains_null: false, contains_nan: Some(false), lower_bound: Some(Datum::float(1.1).to_bytes().unwrap()), upper_bound: Some(Datum::float(2.1).to_bytes().unwrap())}]
318 ),
319 key_metadata: None,
320 first_row_id: Some(13),
321 }
322 ]
323 };
324
325 let file_io = FileIO::new_with_fs();
326
327 let tmp_dir = TempDir::new().unwrap();
328 let file_name = "simple_manifest_list_v3.avro";
329 let full_path = format!("{}/{}", tmp_dir.path().to_str().unwrap(), file_name);
330
331 let mut writer = ManifestListWriter::v3(
332 file_io
333 .new_output(full_path.clone())
334 .unwrap()
335 .writer()
336 .await
337 .unwrap(),
338 377075049360453639,
339 Some(377075049360453639),
340 1,
341 Some(10),
342 );
343
344 writer
345 .add_manifests(manifest_list.entries.clone().into_iter())
346 .unwrap();
347 writer.close().await.unwrap();
348
349 let bs = fs::read(full_path).expect("read_file must succeed");
350
351 let parsed_manifest_list =
352 ManifestList::parse_with_version(&bs, crate::spec::FormatVersion::V3).unwrap();
353
354 assert_eq!(manifest_list, parsed_manifest_list);
355 }
356}