use std::sync::Arc;
use apache_avro::Schema as AvroSchema;
use once_cell::sync::Lazy;
use typed_builder::TypedBuilder;
use crate::avro::schema_to_avro_schema;
use crate::error::Result;
use crate::spec::{
DataContentType, DataFile, ListType, ManifestFile, MapType, NestedField, NestedFieldRef,
PrimitiveType, Schema, StructType, Type, INITIAL_SEQUENCE_NUMBER,
};
use crate::{Error, ErrorKind};
pub type ManifestEntryRef = Arc<ManifestEntry>;
#[derive(Debug, PartialEq, Eq, Clone, TypedBuilder)]
pub struct ManifestEntry {
pub status: ManifestStatus,
#[builder(default, setter(strip_option(fallback = snapshot_id_opt)))]
pub snapshot_id: Option<i64>,
#[builder(default, setter(strip_option(fallback = sequence_number_opt)))]
pub sequence_number: Option<i64>,
#[builder(default, setter(strip_option(fallback = file_sequence_number_opt)))]
pub file_sequence_number: Option<i64>,
pub data_file: DataFile,
}
impl ManifestEntry {
pub fn is_alive(&self) -> bool {
matches!(
self.status,
ManifestStatus::Added | ManifestStatus::Existing
)
}
pub fn status(&self) -> ManifestStatus {
self.status
}
#[inline]
pub fn content_type(&self) -> DataContentType {
self.data_file.content
}
#[inline]
pub fn file_format(&self) -> DataFileFormat {
self.data_file.file_format
}
#[inline]
pub fn file_path(&self) -> &str {
&self.data_file.file_path
}
#[inline]
pub fn record_count(&self) -> u64 {
self.data_file.record_count
}
pub(crate) fn inherit_data(&mut self, snapshot_entry: &ManifestFile) {
if self.snapshot_id.is_none() {
self.snapshot_id = Some(snapshot_entry.added_snapshot_id);
}
if self.sequence_number.is_none()
&& (self.status == ManifestStatus::Added
|| snapshot_entry.sequence_number == INITIAL_SEQUENCE_NUMBER)
{
self.sequence_number = Some(snapshot_entry.sequence_number);
}
if self.file_sequence_number.is_none()
&& (self.status == ManifestStatus::Added
|| snapshot_entry.sequence_number == INITIAL_SEQUENCE_NUMBER)
{
self.file_sequence_number = Some(snapshot_entry.sequence_number);
}
}
#[inline]
pub fn snapshot_id(&self) -> Option<i64> {
self.snapshot_id
}
#[inline]
pub fn sequence_number(&self) -> Option<i64> {
self.sequence_number
}
#[inline]
pub fn file_size_in_bytes(&self) -> u64 {
self.data_file.file_size_in_bytes
}
#[inline]
pub fn data_file(&self) -> &DataFile {
&self.data_file
}
}
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
pub enum ManifestStatus {
Existing = 0,
Added = 1,
Deleted = 2,
}
impl TryFrom<i32> for ManifestStatus {
type Error = Error;
fn try_from(v: i32) -> Result<ManifestStatus> {
match v {
0 => Ok(ManifestStatus::Existing),
1 => Ok(ManifestStatus::Added),
2 => Ok(ManifestStatus::Deleted),
_ => Err(Error::new(
ErrorKind::DataInvalid,
format!("manifest status {} is invalid", v),
)),
}
}
}
use super::DataFileFormat;
static STATUS: Lazy<NestedFieldRef> = {
Lazy::new(|| {
Arc::new(NestedField::required(
0,
"status",
Type::Primitive(PrimitiveType::Int),
))
})
};
static SNAPSHOT_ID_V1: Lazy<NestedFieldRef> = {
Lazy::new(|| {
Arc::new(NestedField::required(
1,
"snapshot_id",
Type::Primitive(PrimitiveType::Long),
))
})
};
static SNAPSHOT_ID_V2: Lazy<NestedFieldRef> = {
Lazy::new(|| {
Arc::new(NestedField::optional(
1,
"snapshot_id",
Type::Primitive(PrimitiveType::Long),
))
})
};
static SEQUENCE_NUMBER: Lazy<NestedFieldRef> = {
Lazy::new(|| {
Arc::new(NestedField::optional(
3,
"sequence_number",
Type::Primitive(PrimitiveType::Long),
))
})
};
static FILE_SEQUENCE_NUMBER: Lazy<NestedFieldRef> = {
Lazy::new(|| {
Arc::new(NestedField::optional(
4,
"file_sequence_number",
Type::Primitive(PrimitiveType::Long),
))
})
};
static CONTENT: Lazy<NestedFieldRef> = {
Lazy::new(|| {
Arc::new(NestedField::required(
134,
"content",
Type::Primitive(PrimitiveType::Int),
))
})
};
static FILE_PATH: Lazy<NestedFieldRef> = {
Lazy::new(|| {
Arc::new(NestedField::required(
100,
"file_path",
Type::Primitive(PrimitiveType::String),
))
})
};
static FILE_FORMAT: Lazy<NestedFieldRef> = {
Lazy::new(|| {
Arc::new(NestedField::required(
101,
"file_format",
Type::Primitive(PrimitiveType::String),
))
})
};
static RECORD_COUNT: Lazy<NestedFieldRef> = {
Lazy::new(|| {
Arc::new(NestedField::required(
103,
"record_count",
Type::Primitive(PrimitiveType::Long),
))
})
};
static FILE_SIZE_IN_BYTES: Lazy<NestedFieldRef> = {
Lazy::new(|| {
Arc::new(NestedField::required(
104,
"file_size_in_bytes",
Type::Primitive(PrimitiveType::Long),
))
})
};
static BLOCK_SIZE_IN_BYTES: Lazy<NestedFieldRef> = {
Lazy::new(|| {
Arc::new(NestedField::required(
105,
"block_size_in_bytes",
Type::Primitive(PrimitiveType::Long),
))
})
};
static COLUMN_SIZES: Lazy<NestedFieldRef> = {
Lazy::new(|| {
Arc::new(NestedField::optional(
108,
"column_sizes",
Type::Map(MapType {
key_field: Arc::new(NestedField::required(
117,
"key",
Type::Primitive(PrimitiveType::Int),
)),
value_field: Arc::new(NestedField::required(
118,
"value",
Type::Primitive(PrimitiveType::Long),
)),
}),
))
})
};
static VALUE_COUNTS: Lazy<NestedFieldRef> = {
Lazy::new(|| {
Arc::new(NestedField::optional(
109,
"value_counts",
Type::Map(MapType {
key_field: Arc::new(NestedField::required(
119,
"key",
Type::Primitive(PrimitiveType::Int),
)),
value_field: Arc::new(NestedField::required(
120,
"value",
Type::Primitive(PrimitiveType::Long),
)),
}),
))
})
};
static NULL_VALUE_COUNTS: Lazy<NestedFieldRef> = {
Lazy::new(|| {
Arc::new(NestedField::optional(
110,
"null_value_counts",
Type::Map(MapType {
key_field: Arc::new(NestedField::required(
121,
"key",
Type::Primitive(PrimitiveType::Int),
)),
value_field: Arc::new(NestedField::required(
122,
"value",
Type::Primitive(PrimitiveType::Long),
)),
}),
))
})
};
static NAN_VALUE_COUNTS: Lazy<NestedFieldRef> = {
Lazy::new(|| {
Arc::new(NestedField::optional(
137,
"nan_value_counts",
Type::Map(MapType {
key_field: Arc::new(NestedField::required(
138,
"key",
Type::Primitive(PrimitiveType::Int),
)),
value_field: Arc::new(NestedField::required(
139,
"value",
Type::Primitive(PrimitiveType::Long),
)),
}),
))
})
};
static LOWER_BOUNDS: Lazy<NestedFieldRef> = {
Lazy::new(|| {
Arc::new(NestedField::optional(
125,
"lower_bounds",
Type::Map(MapType {
key_field: Arc::new(NestedField::required(
126,
"key",
Type::Primitive(PrimitiveType::Int),
)),
value_field: Arc::new(NestedField::required(
127,
"value",
Type::Primitive(PrimitiveType::Binary),
)),
}),
))
})
};
static UPPER_BOUNDS: Lazy<NestedFieldRef> = {
Lazy::new(|| {
Arc::new(NestedField::optional(
128,
"upper_bounds",
Type::Map(MapType {
key_field: Arc::new(NestedField::required(
129,
"key",
Type::Primitive(PrimitiveType::Int),
)),
value_field: Arc::new(NestedField::required(
130,
"value",
Type::Primitive(PrimitiveType::Binary),
)),
}),
))
})
};
static KEY_METADATA: Lazy<NestedFieldRef> = {
Lazy::new(|| {
Arc::new(NestedField::optional(
131,
"key_metadata",
Type::Primitive(PrimitiveType::Binary),
))
})
};
static SPLIT_OFFSETS: Lazy<NestedFieldRef> = {
Lazy::new(|| {
Arc::new(NestedField::optional(
132,
"split_offsets",
Type::List(ListType {
element_field: Arc::new(NestedField::required(
133,
"element",
Type::Primitive(PrimitiveType::Long),
)),
}),
))
})
};
static EQUALITY_IDS: Lazy<NestedFieldRef> = {
Lazy::new(|| {
Arc::new(NestedField::optional(
135,
"equality_ids",
Type::List(ListType {
element_field: Arc::new(NestedField::required(
136,
"element",
Type::Primitive(PrimitiveType::Int),
)),
}),
))
})
};
static SORT_ORDER_ID: Lazy<NestedFieldRef> = {
Lazy::new(|| {
Arc::new(NestedField::optional(
140,
"sort_order_id",
Type::Primitive(PrimitiveType::Int),
))
})
};
fn data_file_fields_v2(partition_type: &StructType) -> Vec<NestedFieldRef> {
vec![
CONTENT.clone(),
FILE_PATH.clone(),
FILE_FORMAT.clone(),
Arc::new(NestedField::required(
102,
"partition",
Type::Struct(partition_type.clone()),
)),
RECORD_COUNT.clone(),
FILE_SIZE_IN_BYTES.clone(),
COLUMN_SIZES.clone(),
VALUE_COUNTS.clone(),
NULL_VALUE_COUNTS.clone(),
NAN_VALUE_COUNTS.clone(),
LOWER_BOUNDS.clone(),
UPPER_BOUNDS.clone(),
KEY_METADATA.clone(),
SPLIT_OFFSETS.clone(),
EQUALITY_IDS.clone(),
SORT_ORDER_ID.clone(),
]
}
pub(super) fn data_file_schema_v2(partition_type: &StructType) -> Result<AvroSchema> {
let schema = Schema::builder()
.with_fields(data_file_fields_v2(partition_type))
.build()?;
schema_to_avro_schema("data_file", &schema)
}
pub(super) fn manifest_schema_v2(partition_type: &StructType) -> Result<AvroSchema> {
let fields = vec![
STATUS.clone(),
SNAPSHOT_ID_V2.clone(),
SEQUENCE_NUMBER.clone(),
FILE_SEQUENCE_NUMBER.clone(),
Arc::new(NestedField::required(
2,
"data_file",
Type::Struct(StructType::new(data_file_fields_v2(partition_type))),
)),
];
let schema = Schema::builder().with_fields(fields).build()?;
schema_to_avro_schema("manifest_entry", &schema)
}
fn data_file_fields_v1(partition_type: &StructType) -> Vec<NestedFieldRef> {
vec![
FILE_PATH.clone(),
FILE_FORMAT.clone(),
Arc::new(NestedField::required(
102,
"partition",
Type::Struct(partition_type.clone()),
)),
RECORD_COUNT.clone(),
FILE_SIZE_IN_BYTES.clone(),
BLOCK_SIZE_IN_BYTES.clone(),
COLUMN_SIZES.clone(),
VALUE_COUNTS.clone(),
NULL_VALUE_COUNTS.clone(),
NAN_VALUE_COUNTS.clone(),
LOWER_BOUNDS.clone(),
UPPER_BOUNDS.clone(),
KEY_METADATA.clone(),
SPLIT_OFFSETS.clone(),
SORT_ORDER_ID.clone(),
]
}
pub(super) fn data_file_schema_v1(partition_type: &StructType) -> Result<AvroSchema> {
let schema = Schema::builder()
.with_fields(data_file_fields_v1(partition_type))
.build()?;
schema_to_avro_schema("data_file", &schema)
}
pub(super) fn manifest_schema_v1(partition_type: &StructType) -> Result<AvroSchema> {
let fields = vec![
STATUS.clone(),
SNAPSHOT_ID_V1.clone(),
Arc::new(NestedField::required(
2,
"data_file",
Type::Struct(StructType::new(data_file_fields_v1(partition_type))),
)),
];
let schema = Schema::builder().with_fields(fields).build()?;
schema_to_avro_schema("manifest_entry", &schema)
}