1use std::collections::HashMap;
19
20use serde_derive::{Deserialize, Serialize};
21use serde_with::serde_as;
22
23use super::{Datum, ManifestEntry, Schema, Struct};
24use crate::spec::{FormatVersion, Literal, RawLiteral, StructType, Type};
25use crate::{Error, ErrorKind, metadata_columns};
26
27#[derive(Serialize, Deserialize)]
28pub(super) struct ManifestEntryV2 {
29 status: i32,
30 snapshot_id: Option<i64>,
31 sequence_number: Option<i64>,
32 file_sequence_number: Option<i64>,
33 data_file: DataFileSerde,
34}
35
36impl ManifestEntryV2 {
37 pub fn try_from(value: ManifestEntry, partition_type: &StructType) -> Result<Self, Error> {
38 Ok(Self {
39 status: value.status as i32,
40 snapshot_id: value.snapshot_id,
41 sequence_number: value.sequence_number,
42 file_sequence_number: value.file_sequence_number,
43 data_file: DataFileSerde::try_from(value.data_file, partition_type, FormatVersion::V2)?,
44 })
45 }
46
47 pub fn try_into(
48 self,
49 partition_spec_id: i32,
50 partition_type: &StructType,
51 schema: &Schema,
52 ) -> Result<ManifestEntry, Error> {
53 Ok(ManifestEntry {
54 status: self.status.try_into()?,
55 snapshot_id: self.snapshot_id,
56 sequence_number: self.sequence_number,
57 file_sequence_number: self.file_sequence_number,
58 data_file: self
59 .data_file
60 .try_into(partition_spec_id, partition_type, schema)?,
61 })
62 }
63}
64
65#[derive(Serialize, Deserialize)]
66pub(super) struct ManifestEntryV1 {
67 status: i32,
68 pub snapshot_id: i64,
69 data_file: DataFileSerde,
70}
71
72impl ManifestEntryV1 {
73 pub fn try_from(value: ManifestEntry, partition_type: &StructType) -> Result<Self, Error> {
74 Ok(Self {
75 status: value.status as i32,
76 snapshot_id: value.snapshot_id.unwrap_or_default(),
77 data_file: DataFileSerde::try_from(value.data_file, partition_type, FormatVersion::V1)?,
78 })
79 }
80
81 pub fn try_into(
82 self,
83 partition_spec_id: i32,
84 partition_type: &StructType,
85 schema: &Schema,
86 ) -> Result<ManifestEntry, Error> {
87 Ok(ManifestEntry {
88 status: self.status.try_into()?,
89 snapshot_id: Some(self.snapshot_id),
90 sequence_number: Some(0),
91 file_sequence_number: Some(0),
92 data_file: self
93 .data_file
94 .try_into(partition_spec_id, partition_type, schema)?,
95 })
96 }
97}
98
99#[serde_as]
100#[derive(Serialize, Deserialize)]
101pub(super) struct DataFileSerde {
102 #[serde(default)]
103 content: i32,
104 file_path: String,
105 file_format: String,
106 partition: RawLiteral,
107 record_count: i64,
108 file_size_in_bytes: i64,
109 #[serde(skip_deserializing, skip_serializing_if = "Option::is_none")]
110 block_size_in_bytes: Option<i64>,
111 column_sizes: Option<Vec<I64Entry>>,
112 value_counts: Option<Vec<I64Entry>>,
113 null_value_counts: Option<Vec<I64Entry>>,
114 nan_value_counts: Option<Vec<I64Entry>>,
115 lower_bounds: Option<Vec<BytesEntry>>,
116 upper_bounds: Option<Vec<BytesEntry>>,
117 key_metadata: Option<serde_bytes::ByteBuf>,
118 split_offsets: Option<Vec<i64>>,
119 equality_ids: Option<Vec<i32>>,
120 sort_order_id: Option<i32>,
121 first_row_id: Option<i64>,
122 referenced_data_file: Option<String>,
123 content_offset: Option<i64>,
124 content_size_in_bytes: Option<i64>,
125}
126
127impl DataFileSerde {
128 pub fn try_from(
129 value: super::DataFile,
130 partition_type: &StructType,
131 format_version: FormatVersion,
132 ) -> Result<Self, Error> {
133 let block_size_in_bytes = if format_version == FormatVersion::V1 {
134 Some(0)
135 } else {
136 None
137 };
138 Ok(Self {
139 content: value.content as i32,
140 file_path: value.file_path,
141 file_format: value.file_format.to_string().to_ascii_uppercase(),
142 partition: RawLiteral::try_from(
143 Literal::Struct(value.partition),
144 &Type::Struct(partition_type.clone()),
145 )?,
146 record_count: value.record_count.try_into()?,
147 file_size_in_bytes: value.file_size_in_bytes.try_into()?,
148 block_size_in_bytes,
149 column_sizes: Some(to_i64_entry(value.column_sizes)?),
150 value_counts: Some(to_i64_entry(value.value_counts)?),
151 null_value_counts: Some(to_i64_entry(value.null_value_counts)?),
152 nan_value_counts: Some(to_i64_entry(value.nan_value_counts)?),
153 lower_bounds: Some(to_bytes_entry(value.lower_bounds)?),
154 upper_bounds: Some(to_bytes_entry(value.upper_bounds)?),
155 key_metadata: value.key_metadata.map(serde_bytes::ByteBuf::from),
156 split_offsets: value.split_offsets,
157 equality_ids: value.equality_ids,
158 sort_order_id: value.sort_order_id,
159 first_row_id: value.first_row_id,
160 referenced_data_file: value.referenced_data_file,
161 content_offset: value.content_offset,
162 content_size_in_bytes: value.content_size_in_bytes,
163 })
164 }
165
166 pub fn try_into(
167 self,
168 partition_spec_id: i32,
169 partition_type: &StructType,
170 schema: &Schema,
171 ) -> Result<super::DataFile, Error> {
172 let partition = self
173 .partition
174 .try_into(&Type::Struct(partition_type.clone()))?
175 .map(|v| {
176 if let Literal::Struct(v) = v {
177 Ok(v)
178 } else {
179 Err(Error::new(
180 ErrorKind::DataInvalid,
181 "partition value is not a struct",
182 ))
183 }
184 })
185 .transpose()?
186 .unwrap_or(Struct::empty());
187 Ok(super::DataFile {
188 content: self.content.try_into()?,
189 file_path: self.file_path,
190 file_format: self.file_format.parse()?,
191 partition,
192 record_count: self.record_count.try_into()?,
193 file_size_in_bytes: self.file_size_in_bytes.try_into()?,
194 column_sizes: self
195 .column_sizes
196 .map(parse_i64_entry)
197 .transpose()?
198 .unwrap_or_default(),
199 value_counts: self
200 .value_counts
201 .map(parse_i64_entry)
202 .transpose()?
203 .unwrap_or_default(),
204 null_value_counts: self
205 .null_value_counts
206 .map(parse_i64_entry)
207 .transpose()?
208 .unwrap_or_default(),
209 nan_value_counts: self
210 .nan_value_counts
211 .map(parse_i64_entry)
212 .transpose()?
213 .unwrap_or_default(),
214 lower_bounds: self
215 .lower_bounds
216 .map(|v| parse_bytes_entry(v, schema))
217 .transpose()?
218 .unwrap_or_default(),
219 upper_bounds: self
220 .upper_bounds
221 .map(|v| parse_bytes_entry(v, schema))
222 .transpose()?
223 .unwrap_or_default(),
224 key_metadata: self.key_metadata.map(|v| v.to_vec()),
225 split_offsets: self.split_offsets,
226 equality_ids: self.equality_ids,
227 sort_order_id: self.sort_order_id,
228 partition_spec_id,
229 first_row_id: self.first_row_id,
230 referenced_data_file: self.referenced_data_file,
231 content_offset: self.content_offset,
232 content_size_in_bytes: self.content_size_in_bytes,
233 })
234 }
235}
236
237#[serde_as]
238#[derive(Serialize, Deserialize)]
239#[cfg_attr(test, derive(Debug, PartialEq, Eq))]
240struct BytesEntry {
241 key: i32,
242 value: serde_bytes::ByteBuf,
243}
244
245fn parse_bytes_entry(v: Vec<BytesEntry>, schema: &Schema) -> Result<HashMap<i32, Datum>, Error> {
246 let mut m = HashMap::with_capacity(v.len());
247 for entry in v {
248 let field = schema
250 .field_by_id(entry.key)
251 .or_else(|| metadata_columns::get_metadata_field(entry.key).ok());
252
253 if let Some(field) = field {
254 let data_type = field
255 .field_type
256 .as_primitive_type()
257 .ok_or_else(|| {
258 Error::new(
259 ErrorKind::DataInvalid,
260 format!("field {} is not a primitive type", field.name),
261 )
262 })?
263 .clone();
264 m.insert(entry.key, Datum::try_from_bytes(&entry.value, data_type)?);
265 }
266 }
268 Ok(m)
269}
270
271fn to_bytes_entry(v: impl IntoIterator<Item = (i32, Datum)>) -> Result<Vec<BytesEntry>, Error> {
272 let iter = v.into_iter();
273 let mut bs = Vec::with_capacity(iter.size_hint().0);
275 for (k, d) in iter {
276 bs.push(BytesEntry {
277 key: k,
278 value: d.to_bytes()?,
279 });
280 }
281 Ok(bs)
282}
283
284#[derive(Serialize, Deserialize)]
285#[cfg_attr(test, derive(Debug, PartialEq, Eq))]
286struct I64Entry {
287 key: i32,
288 value: i64,
289}
290
291fn parse_i64_entry(v: Vec<I64Entry>) -> Result<HashMap<i32, u64>, Error> {
292 let mut m = HashMap::with_capacity(v.len());
293 for entry in v {
294 if let Ok(v) = entry.value.try_into() {
297 m.insert(entry.key, v);
298 }
299 }
300 Ok(m)
301}
302
303#[allow(unused_mut)]
304fn to_i64_entry(entries: HashMap<i32, u64>) -> Result<Vec<I64Entry>, Error> {
305 let mut i64_entries = entries
306 .iter()
307 .map(|e| {
308 Ok(I64Entry {
309 key: *e.0,
310 value: (*e.1).try_into()?,
311 })
312 })
313 .collect::<Result<Vec<_>, Error>>()?;
314
315 #[cfg(test)]
317 i64_entries.sort_by_key(|e| e.key);
318
319 Ok(i64_entries)
320}
321
322#[cfg(test)]
323mod tests {
324 use std::collections::HashMap;
325 use std::io::Cursor;
326 use std::sync::Arc;
327
328 use crate::spec::manifest::_serde::{I64Entry, parse_i64_entry};
329 use crate::spec::{
330 DataContentType, DataFile, DataFileFormat, Datum, FormatVersion, NestedField,
331 PrimitiveType, Schema, Struct, StructType, Type, read_data_files_from_avro,
332 write_data_files_to_avro,
333 };
334
335 #[test]
336 fn test_parse_negative_manifest_entry() {
337 let entries = vec![I64Entry { key: 1, value: -1 }, I64Entry {
338 key: 2,
339 value: 3,
340 }];
341
342 let ret = parse_i64_entry(entries).unwrap();
343
344 let expected_ret = HashMap::from([(2, 3)]);
345 assert_eq!(ret, expected_ret, "Negative i64 entry should be ignored!");
346 }
347
348 fn schema() -> Arc<Schema> {
349 Arc::new(
350 Schema::builder()
351 .with_fields(vec![
352 Arc::new(NestedField::optional(
353 1,
354 "v1",
355 Type::Primitive(PrimitiveType::Int),
356 )),
357 Arc::new(NestedField::optional(
358 2,
359 "v2",
360 Type::Primitive(PrimitiveType::String),
361 )),
362 Arc::new(NestedField::optional(
363 3,
364 "v3",
365 Type::Primitive(PrimitiveType::String),
366 )),
367 ])
368 .build()
369 .unwrap(),
370 )
371 }
372
373 fn data_files() -> Vec<DataFile> {
374 vec![DataFile {
375 content: DataContentType::Data,
376 file_path: "s3://testbucket/iceberg_data/iceberg_ctl/iceberg_db/iceberg_tbl/data/00000-7-45268d71-54eb-476c-b42c-942d880c04a1-00001.parquet".to_string(),
377 file_format: DataFileFormat::Parquet,
378 partition: Struct::empty(),
379 record_count: 1,
380 file_size_in_bytes: 875,
381 column_sizes: HashMap::from([(1,47),(2,48),(3,52)]),
382 value_counts: HashMap::from([(1,1),(2,1),(3,1)]),
383 null_value_counts: HashMap::from([(1,0),(2,0),(3,0)]),
384 nan_value_counts: HashMap::new(),
385 lower_bounds: HashMap::from([(1,Datum::int(1)),(2,Datum::string("a")),(3,Datum::string("AC/DC"))]),
386 upper_bounds: HashMap::from([(1,Datum::int(1)),(2,Datum::string("a")),(3,Datum::string("AC/DC"))]),
387 key_metadata: None,
388 split_offsets: Some(vec![4]),
389 equality_ids: None,
390 sort_order_id: Some(0),
391 partition_spec_id: 0,
392 first_row_id: None,
393 referenced_data_file: None,
394 content_offset: None,
395 content_size_in_bytes: None,
396 }]
397 }
398
399 #[tokio::test]
400 async fn test_data_file_serialize_deserialize() {
401 let schema = schema();
402 let data_files = data_files();
403
404 let mut buffer = Vec::new();
405 let _ = write_data_files_to_avro(
406 &mut buffer,
407 data_files.clone().into_iter(),
408 &StructType::new(vec![]),
409 FormatVersion::V2,
410 )
411 .unwrap();
412
413 let actual_data_file = read_data_files_from_avro(
414 &mut Cursor::new(buffer),
415 &schema,
416 0,
417 &StructType::new(vec![]),
418 FormatVersion::V2,
419 )
420 .unwrap();
421
422 assert_eq!(data_files, actual_data_file);
423 }
424
425 #[tokio::test]
426 async fn test_data_file_serialize_deserialize_v1_data_on_v2_reader() {
427 let schema = schema();
428 let data_files = data_files();
429
430 let mut buffer = Vec::new();
431 let _ = write_data_files_to_avro(
432 &mut buffer,
433 data_files.clone().into_iter(),
434 &StructType::new(vec![]),
435 FormatVersion::V1,
436 )
437 .unwrap();
438
439 let actual_data_file = read_data_files_from_avro(
440 &mut Cursor::new(buffer),
441 &schema,
442 0,
443 &StructType::new(vec![]),
444 FormatVersion::V2,
445 )
446 .unwrap();
447
448 assert_eq!(actual_data_file[0].content, DataContentType::Data)
449 }
450
451 #[test]
452 fn test_manifest_entry_v1_to_v2_projection() {
453 use crate::spec::manifest::_serde::{DataFileSerde, ManifestEntryV1};
454 use crate::spec::{Literal, RawLiteral, Struct, StructType};
455
456 let partition = RawLiteral::try_from(
457 Literal::Struct(Struct::empty()),
458 &Type::Struct(StructType::new(vec![])),
459 )
460 .unwrap();
461
462 let v1_entry = ManifestEntryV1 {
464 status: 1, snapshot_id: 12345,
466 data_file: DataFileSerde {
467 content: 0, file_path: "test/path.parquet".to_string(),
469 file_format: "PARQUET".to_string(),
470 partition,
471 record_count: 100,
472 file_size_in_bytes: 1024,
473 block_size_in_bytes: Some(0), column_sizes: None,
475 value_counts: None,
476 null_value_counts: None,
477 nan_value_counts: None,
478 lower_bounds: None,
479 upper_bounds: None,
480 key_metadata: None,
481 split_offsets: None,
482 equality_ids: None, sort_order_id: None,
484 first_row_id: None,
485 referenced_data_file: None,
486 content_offset: None,
487 content_size_in_bytes: None,
488 },
489 };
490
491 let v2_entry = v1_entry
493 .try_into(
494 0, &StructType::new(vec![]),
496 &schema(),
497 )
498 .unwrap();
499
500 assert_eq!(
502 v2_entry.sequence_number,
503 Some(0),
504 "ManifestEntryV1::try_into() should set sequence_number to 0"
505 );
506 assert_eq!(
507 v2_entry.file_sequence_number,
508 Some(0),
509 "ManifestEntryV1::try_into() should set file_sequence_number to 0"
510 );
511 assert_eq!(
512 v2_entry.snapshot_id,
513 Some(12345),
514 "snapshot_id should be preserved during conversion"
515 );
516
517 assert_eq!(
519 v2_entry.data_file.content,
520 DataContentType::Data,
521 "DataFileSerde should convert content 0 to DataContentType::Data"
522 );
523 assert_eq!(
524 v2_entry.data_file.equality_ids, None,
525 "DataFileSerde should preserve None equality_ids as None"
526 );
527
528 assert_eq!(v2_entry.data_file.file_path, "test/path.parquet");
530 assert_eq!(v2_entry.data_file.record_count, 100);
531 assert_eq!(v2_entry.data_file.file_size_in_bytes, 1024);
532 }
533
534 #[test]
535 fn test_data_file_serde_v1_field_defaults() {
536 use crate::spec::manifest::_serde::DataFileSerde;
537 use crate::spec::{Literal, RawLiteral, Struct, StructType};
538
539 let partition = RawLiteral::try_from(
540 Literal::Struct(Struct::empty()),
541 &Type::Struct(StructType::new(vec![])),
542 )
543 .unwrap();
544
545 let v1_style_data_file = DataFileSerde {
548 content: 0, file_path: "test/data.parquet".to_string(),
550 file_format: "PARQUET".to_string(),
551 partition,
552 record_count: 500,
553 file_size_in_bytes: 2048,
554 block_size_in_bytes: Some(1024), column_sizes: None,
556 value_counts: None,
557 null_value_counts: None,
558 nan_value_counts: None,
559 lower_bounds: None,
560 upper_bounds: None,
561 key_metadata: None,
562 split_offsets: None,
563 equality_ids: None, sort_order_id: None,
565 first_row_id: None,
566 referenced_data_file: None,
567 content_offset: None,
568 content_size_in_bytes: None,
569 };
570
571 let data_file = v1_style_data_file
573 .try_into(
574 0, &StructType::new(vec![]),
576 &schema(),
577 )
578 .unwrap();
579
580 assert_eq!(
582 data_file.content,
583 DataContentType::Data,
584 "content 0 should convert to DataContentType::Data"
585 );
586 assert_eq!(
587 data_file.equality_ids, None,
588 "None equality_ids should remain as None"
589 );
590
591 assert_eq!(data_file.file_path, "test/data.parquet");
593 assert_eq!(data_file.file_format, DataFileFormat::Parquet);
594 assert_eq!(data_file.record_count, 500);
595 assert_eq!(data_file.file_size_in_bytes, 2048);
596 assert_eq!(data_file.partition_spec_id, 0);
597 }
598
599 #[test]
600 fn test_parse_bytes_entry_with_metadata_column() {
601 use crate::metadata_columns::RESERVED_FIELD_ID_POS;
602 use crate::spec::manifest::_serde::{BytesEntry, parse_bytes_entry};
603
604 let test_schema = schema();
606
607 let pos_value: i64 = 42;
610 let bytes_entry = BytesEntry {
611 key: RESERVED_FIELD_ID_POS,
612 value: serde_bytes::ByteBuf::from(pos_value.to_le_bytes().to_vec()),
613 };
614
615 let result = parse_bytes_entry(vec![bytes_entry], &test_schema).unwrap();
617
618 assert!(
620 result.contains_key(&RESERVED_FIELD_ID_POS),
621 "_pos metadata field should be parsed"
622 );
623 assert_eq!(
624 result.get(&RESERVED_FIELD_ID_POS),
625 Some(&Datum::long(pos_value)),
626 "_pos should be parsed as long with correct value"
627 );
628 }
629}