1use std::sync::Arc;
19
20use apache_avro::Schema as AvroSchema;
21use once_cell::sync::Lazy;
22use typed_builder::TypedBuilder;
23
24use crate::avro::schema_to_avro_schema;
25use crate::error::Result;
26use crate::spec::{
27 DataContentType, DataFile, INITIAL_SEQUENCE_NUMBER, ListType, Literal, ManifestFile, MapType,
28 NestedField, NestedFieldRef, PrimitiveLiteral, PrimitiveType, Schema, StructType, Type,
29};
30use crate::{Error, ErrorKind};
31
32pub type ManifestEntryRef = Arc<ManifestEntry>;
34
35#[derive(Debug, PartialEq, Eq, Clone, TypedBuilder)]
39pub struct ManifestEntry {
40 pub status: ManifestStatus,
44 #[builder(default, setter(strip_option(fallback = snapshot_id_opt)))]
49 pub snapshot_id: Option<i64>,
50 #[builder(default, setter(strip_option(fallback = sequence_number_opt)))]
55 pub sequence_number: Option<i64>,
56 #[builder(default, setter(strip_option(fallback = file_sequence_number_opt)))]
61 pub file_sequence_number: Option<i64>,
62 pub data_file: DataFile,
66}
67
68impl ManifestEntry {
69 pub fn is_alive(&self) -> bool {
71 matches!(
72 self.status,
73 ManifestStatus::Added | ManifestStatus::Existing
74 )
75 }
76
77 pub fn status(&self) -> ManifestStatus {
79 self.status
80 }
81
82 #[inline]
84 pub fn content_type(&self) -> DataContentType {
85 self.data_file.content
86 }
87
88 #[inline]
90 pub fn file_format(&self) -> DataFileFormat {
91 self.data_file.file_format
92 }
93
94 #[inline]
96 pub fn file_path(&self) -> &str {
97 &self.data_file.file_path
98 }
99
100 #[inline]
102 pub fn record_count(&self) -> u64 {
103 self.data_file.record_count
104 }
105
106 pub(crate) fn inherit_data(&mut self, snapshot_entry: &ManifestFile) {
108 if self.snapshot_id.is_none() {
109 self.snapshot_id = Some(snapshot_entry.added_snapshot_id);
110 }
111
112 if self.sequence_number.is_none()
113 && (self.status == ManifestStatus::Added
114 || snapshot_entry.sequence_number == INITIAL_SEQUENCE_NUMBER)
115 {
116 self.sequence_number = Some(snapshot_entry.sequence_number);
117 }
118
119 if self.file_sequence_number.is_none()
120 && (self.status == ManifestStatus::Added
121 || snapshot_entry.sequence_number == INITIAL_SEQUENCE_NUMBER)
122 {
123 self.file_sequence_number = Some(snapshot_entry.sequence_number);
124 }
125 }
126
127 #[inline]
129 pub fn snapshot_id(&self) -> Option<i64> {
130 self.snapshot_id
131 }
132
133 #[inline]
135 pub fn sequence_number(&self) -> Option<i64> {
136 self.sequence_number
137 }
138
139 #[inline]
141 pub fn file_size_in_bytes(&self) -> u64 {
142 self.data_file.file_size_in_bytes
143 }
144
145 #[inline]
147 pub fn data_file(&self) -> &DataFile {
148 &self.data_file
149 }
150}
151
152#[derive(Debug, PartialEq, Eq, Clone, Copy)]
154pub enum ManifestStatus {
155 Existing = 0,
157 Added = 1,
159 Deleted = 2,
163}
164
165impl TryFrom<i32> for ManifestStatus {
166 type Error = Error;
167
168 fn try_from(v: i32) -> Result<ManifestStatus> {
169 match v {
170 0 => Ok(ManifestStatus::Existing),
171 1 => Ok(ManifestStatus::Added),
172 2 => Ok(ManifestStatus::Deleted),
173 _ => Err(Error::new(
174 ErrorKind::DataInvalid,
175 format!("manifest status {v} is invalid"),
176 )),
177 }
178 }
179}
180
181use super::DataFileFormat;
182
183static STATUS: Lazy<NestedFieldRef> = {
184 Lazy::new(|| {
185 Arc::new(NestedField::required(
186 0,
187 "status",
188 Type::Primitive(PrimitiveType::Int),
189 ))
190 })
191};
192
193static SNAPSHOT_ID_V1: Lazy<NestedFieldRef> = {
194 Lazy::new(|| {
195 Arc::new(NestedField::required(
196 1,
197 "snapshot_id",
198 Type::Primitive(PrimitiveType::Long),
199 ))
200 })
201};
202
203static SNAPSHOT_ID_V2: Lazy<NestedFieldRef> = {
204 Lazy::new(|| {
205 Arc::new(NestedField::optional(
206 1,
207 "snapshot_id",
208 Type::Primitive(PrimitiveType::Long),
209 ))
210 })
211};
212
213static SEQUENCE_NUMBER: Lazy<NestedFieldRef> = {
214 Lazy::new(|| {
215 Arc::new(NestedField::optional(
216 3,
217 "sequence_number",
218 Type::Primitive(PrimitiveType::Long),
219 ))
220 })
221};
222
223static FILE_SEQUENCE_NUMBER: Lazy<NestedFieldRef> = {
224 Lazy::new(|| {
225 Arc::new(NestedField::optional(
226 4,
227 "file_sequence_number",
228 Type::Primitive(PrimitiveType::Long),
229 ))
230 })
231};
232
233static CONTENT: Lazy<NestedFieldRef> = {
234 Lazy::new(|| {
235 Arc::new(
236 NestedField::required(134, "content", Type::Primitive(PrimitiveType::Int))
237 .with_initial_default(Literal::Primitive(PrimitiveLiteral::Int(0))),
239 )
240 })
241};
242
243static FILE_PATH: Lazy<NestedFieldRef> = {
244 Lazy::new(|| {
245 Arc::new(NestedField::required(
246 100,
247 "file_path",
248 Type::Primitive(PrimitiveType::String),
249 ))
250 })
251};
252
253static FILE_FORMAT: Lazy<NestedFieldRef> = {
254 Lazy::new(|| {
255 Arc::new(NestedField::required(
256 101,
257 "file_format",
258 Type::Primitive(PrimitiveType::String),
259 ))
260 })
261};
262
263static RECORD_COUNT: Lazy<NestedFieldRef> = {
264 Lazy::new(|| {
265 Arc::new(NestedField::required(
266 103,
267 "record_count",
268 Type::Primitive(PrimitiveType::Long),
269 ))
270 })
271};
272
273static FILE_SIZE_IN_BYTES: Lazy<NestedFieldRef> = {
274 Lazy::new(|| {
275 Arc::new(NestedField::required(
276 104,
277 "file_size_in_bytes",
278 Type::Primitive(PrimitiveType::Long),
279 ))
280 })
281};
282
283static BLOCK_SIZE_IN_BYTES: Lazy<NestedFieldRef> = {
285 Lazy::new(|| {
286 Arc::new(NestedField::required(
287 105,
288 "block_size_in_bytes",
289 Type::Primitive(PrimitiveType::Long),
290 ))
291 })
292};
293
294static COLUMN_SIZES: Lazy<NestedFieldRef> = {
295 Lazy::new(|| {
296 Arc::new(NestedField::optional(
297 108,
298 "column_sizes",
299 Type::Map(MapType {
300 key_field: Arc::new(NestedField::required(
301 117,
302 "key",
303 Type::Primitive(PrimitiveType::Int),
304 )),
305 value_field: Arc::new(NestedField::required(
306 118,
307 "value",
308 Type::Primitive(PrimitiveType::Long),
309 )),
310 }),
311 ))
312 })
313};
314
315static VALUE_COUNTS: Lazy<NestedFieldRef> = {
316 Lazy::new(|| {
317 Arc::new(NestedField::optional(
318 109,
319 "value_counts",
320 Type::Map(MapType {
321 key_field: Arc::new(NestedField::required(
322 119,
323 "key",
324 Type::Primitive(PrimitiveType::Int),
325 )),
326 value_field: Arc::new(NestedField::required(
327 120,
328 "value",
329 Type::Primitive(PrimitiveType::Long),
330 )),
331 }),
332 ))
333 })
334};
335
336static NULL_VALUE_COUNTS: Lazy<NestedFieldRef> = {
337 Lazy::new(|| {
338 Arc::new(NestedField::optional(
339 110,
340 "null_value_counts",
341 Type::Map(MapType {
342 key_field: Arc::new(NestedField::required(
343 121,
344 "key",
345 Type::Primitive(PrimitiveType::Int),
346 )),
347 value_field: Arc::new(NestedField::required(
348 122,
349 "value",
350 Type::Primitive(PrimitiveType::Long),
351 )),
352 }),
353 ))
354 })
355};
356
357static NAN_VALUE_COUNTS: Lazy<NestedFieldRef> = {
358 Lazy::new(|| {
359 Arc::new(NestedField::optional(
360 137,
361 "nan_value_counts",
362 Type::Map(MapType {
363 key_field: Arc::new(NestedField::required(
364 138,
365 "key",
366 Type::Primitive(PrimitiveType::Int),
367 )),
368 value_field: Arc::new(NestedField::required(
369 139,
370 "value",
371 Type::Primitive(PrimitiveType::Long),
372 )),
373 }),
374 ))
375 })
376};
377
378static LOWER_BOUNDS: Lazy<NestedFieldRef> = {
379 Lazy::new(|| {
380 Arc::new(NestedField::optional(
381 125,
382 "lower_bounds",
383 Type::Map(MapType {
384 key_field: Arc::new(NestedField::required(
385 126,
386 "key",
387 Type::Primitive(PrimitiveType::Int),
388 )),
389 value_field: Arc::new(NestedField::required(
390 127,
391 "value",
392 Type::Primitive(PrimitiveType::Binary),
393 )),
394 }),
395 ))
396 })
397};
398
399static UPPER_BOUNDS: Lazy<NestedFieldRef> = {
400 Lazy::new(|| {
401 Arc::new(NestedField::optional(
402 128,
403 "upper_bounds",
404 Type::Map(MapType {
405 key_field: Arc::new(NestedField::required(
406 129,
407 "key",
408 Type::Primitive(PrimitiveType::Int),
409 )),
410 value_field: Arc::new(NestedField::required(
411 130,
412 "value",
413 Type::Primitive(PrimitiveType::Binary),
414 )),
415 }),
416 ))
417 })
418};
419
420static KEY_METADATA: Lazy<NestedFieldRef> = {
421 Lazy::new(|| {
422 Arc::new(NestedField::optional(
423 131,
424 "key_metadata",
425 Type::Primitive(PrimitiveType::Binary),
426 ))
427 })
428};
429
430static SPLIT_OFFSETS: Lazy<NestedFieldRef> = {
431 Lazy::new(|| {
432 Arc::new(NestedField::optional(
433 132,
434 "split_offsets",
435 Type::List(ListType {
436 element_field: Arc::new(NestedField::required(
437 133,
438 "element",
439 Type::Primitive(PrimitiveType::Long),
440 )),
441 }),
442 ))
443 })
444};
445
446static EQUALITY_IDS: Lazy<NestedFieldRef> = {
447 Lazy::new(|| {
448 Arc::new(NestedField::optional(
449 135,
450 "equality_ids",
451 Type::List(ListType {
452 element_field: Arc::new(NestedField::required(
453 136,
454 "element",
455 Type::Primitive(PrimitiveType::Int),
456 )),
457 }),
458 ))
459 })
460};
461
462static SORT_ORDER_ID: Lazy<NestedFieldRef> = {
463 Lazy::new(|| {
464 Arc::new(NestedField::optional(
465 140,
466 "sort_order_id",
467 Type::Primitive(PrimitiveType::Int),
468 ))
469 })
470};
471
472static FIRST_ROW_ID: Lazy<NestedFieldRef> = {
473 Lazy::new(|| {
474 Arc::new(NestedField::optional(
475 142,
476 "first_row_id",
477 Type::Primitive(PrimitiveType::Long),
478 ))
479 })
480};
481
482static REFERENCE_DATA_FILE: Lazy<NestedFieldRef> = {
483 Lazy::new(|| {
484 Arc::new(NestedField::optional(
485 143,
486 "referenced_data_file",
487 Type::Primitive(PrimitiveType::String),
488 ))
489 })
490};
491
492static CONTENT_OFFSET: Lazy<NestedFieldRef> = {
493 Lazy::new(|| {
494 Arc::new(NestedField::optional(
495 144,
496 "content_offset",
497 Type::Primitive(PrimitiveType::Long),
498 ))
499 })
500};
501
502static CONTENT_SIZE_IN_BYTES: Lazy<NestedFieldRef> = {
503 Lazy::new(|| {
504 Arc::new(NestedField::optional(
505 145,
506 "content_size_in_bytes",
507 Type::Primitive(PrimitiveType::Long),
508 ))
509 })
510};
511
512fn data_file_fields_v3(partition_type: &StructType) -> Vec<NestedFieldRef> {
513 vec![
514 CONTENT.clone(),
515 FILE_PATH.clone(),
516 FILE_FORMAT.clone(),
517 Arc::new(NestedField::required(
518 102,
519 "partition",
520 Type::Struct(partition_type.clone()),
521 )),
522 RECORD_COUNT.clone(),
523 FILE_SIZE_IN_BYTES.clone(),
524 COLUMN_SIZES.clone(),
525 VALUE_COUNTS.clone(),
526 NULL_VALUE_COUNTS.clone(),
527 NAN_VALUE_COUNTS.clone(),
528 LOWER_BOUNDS.clone(),
529 UPPER_BOUNDS.clone(),
530 KEY_METADATA.clone(),
531 SPLIT_OFFSETS.clone(),
532 EQUALITY_IDS.clone(),
533 SORT_ORDER_ID.clone(),
534 FIRST_ROW_ID.clone(),
535 REFERENCE_DATA_FILE.clone(),
536 CONTENT_OFFSET.clone(),
537 CONTENT_SIZE_IN_BYTES.clone(),
538 ]
539}
540
541pub(super) fn data_file_schema_v3(partition_type: &StructType) -> Result<AvroSchema> {
542 let schema = Schema::builder()
543 .with_fields(data_file_fields_v3(partition_type))
544 .build()?;
545 schema_to_avro_schema("data_file", &schema)
546}
547
548fn data_file_fields_v2(partition_type: &StructType) -> Vec<NestedFieldRef> {
549 vec![
550 CONTENT.clone(),
551 FILE_PATH.clone(),
552 FILE_FORMAT.clone(),
553 Arc::new(NestedField::required(
554 102,
555 "partition",
556 Type::Struct(partition_type.clone()),
557 )),
558 RECORD_COUNT.clone(),
559 FILE_SIZE_IN_BYTES.clone(),
560 COLUMN_SIZES.clone(),
561 VALUE_COUNTS.clone(),
562 NULL_VALUE_COUNTS.clone(),
563 NAN_VALUE_COUNTS.clone(),
564 LOWER_BOUNDS.clone(),
565 UPPER_BOUNDS.clone(),
566 KEY_METADATA.clone(),
567 SPLIT_OFFSETS.clone(),
568 EQUALITY_IDS.clone(),
569 SORT_ORDER_ID.clone(),
570 FIRST_ROW_ID.clone(),
571 REFERENCE_DATA_FILE.clone(),
572 CONTENT_OFFSET.clone(),
575 CONTENT_SIZE_IN_BYTES.clone(),
576 ]
577}
578
579pub(super) fn data_file_schema_v2(partition_type: &StructType) -> Result<AvroSchema> {
580 let schema = Schema::builder()
581 .with_fields(data_file_fields_v2(partition_type))
582 .build()?;
583 schema_to_avro_schema("data_file", &schema)
584}
585
586pub(super) fn manifest_schema_v2(partition_type: &StructType) -> Result<AvroSchema> {
587 let fields = vec![
588 STATUS.clone(),
589 SNAPSHOT_ID_V2.clone(),
590 SEQUENCE_NUMBER.clone(),
591 FILE_SEQUENCE_NUMBER.clone(),
592 Arc::new(NestedField::required(
593 2,
594 "data_file",
595 Type::Struct(StructType::new(data_file_fields_v2(partition_type))),
596 )),
597 ];
598 let schema = Schema::builder().with_fields(fields).build()?;
599 schema_to_avro_schema("manifest_entry", &schema)
600}
601
602fn data_file_fields_v1(partition_type: &StructType) -> Vec<NestedFieldRef> {
603 vec![
604 FILE_PATH.clone(),
605 FILE_FORMAT.clone(),
606 Arc::new(NestedField::required(
607 102,
608 "partition",
609 Type::Struct(partition_type.clone()),
610 )),
611 RECORD_COUNT.clone(),
612 FILE_SIZE_IN_BYTES.clone(),
613 BLOCK_SIZE_IN_BYTES.clone(),
614 COLUMN_SIZES.clone(),
615 VALUE_COUNTS.clone(),
616 NULL_VALUE_COUNTS.clone(),
617 NAN_VALUE_COUNTS.clone(),
618 LOWER_BOUNDS.clone(),
619 UPPER_BOUNDS.clone(),
620 KEY_METADATA.clone(),
621 SPLIT_OFFSETS.clone(),
622 SORT_ORDER_ID.clone(),
623 ]
624}
625
626pub(super) fn data_file_schema_v1(partition_type: &StructType) -> Result<AvroSchema> {
627 let schema = Schema::builder()
628 .with_fields(data_file_fields_v1(partition_type))
629 .build()?;
630 schema_to_avro_schema("data_file", &schema)
631}
632
633pub(super) fn manifest_schema_v1(partition_type: &StructType) -> Result<AvroSchema> {
634 let fields = vec![
635 STATUS.clone(),
636 SNAPSHOT_ID_V1.clone(),
637 Arc::new(NestedField::required(
638 2,
639 "data_file",
640 Type::Struct(StructType::new(data_file_fields_v1(partition_type))),
641 )),
642 ];
643 let schema = Schema::builder().with_fields(fields).build()?;
644 schema_to_avro_schema("manifest_entry", &schema)
645}