use std::cmp::min;
use std::collections::HashMap;
use std::io::{Read, Write};
use std::str::FromStr;
use std::sync::Arc;
use apache_avro::{from_value, to_value, Reader as AvroReader, Writer as AvroWriter};
use bytes::Bytes;
use itertools::Itertools;
use serde_derive::{Deserialize, Serialize};
use serde_json::to_vec;
use serde_with::{DeserializeFromStr, SerializeDisplay};
use typed_builder::TypedBuilder;
use self::_const_schema::{manifest_schema_v1, manifest_schema_v2};
use super::{
Datum, FieldSummary, FormatVersion, ManifestContentType, ManifestFile, PartitionSpec,
PrimitiveLiteral, PrimitiveType, Schema, SchemaId, SchemaRef, Struct, StructType,
INITIAL_SEQUENCE_NUMBER, UNASSIGNED_SEQUENCE_NUMBER, UNASSIGNED_SNAPSHOT_ID,
};
use crate::error::Result;
use crate::io::OutputFile;
use crate::spec::PartitionField;
use crate::{Error, ErrorKind};
#[derive(Debug, PartialEq, Eq, Clone)]
pub struct Manifest {
metadata: ManifestMetadata,
entries: Vec<ManifestEntryRef>,
}
impl Manifest {
pub(crate) fn try_from_avro_bytes(bs: &[u8]) -> Result<(ManifestMetadata, Vec<ManifestEntry>)> {
let reader = AvroReader::new(bs)?;
let meta = reader.user_metadata();
let metadata = ManifestMetadata::parse(meta)?;
let partition_type = metadata.partition_spec.partition_type(&metadata.schema)?;
let entries = match metadata.format_version {
FormatVersion::V1 => {
let schema = manifest_schema_v1(&partition_type)?;
let reader = AvroReader::with_schema(&schema, bs)?;
reader
.into_iter()
.map(|value| {
from_value::<_serde::ManifestEntryV1>(&value?)?
.try_into(&partition_type, &metadata.schema)
})
.collect::<Result<Vec<_>>>()?
}
FormatVersion::V2 => {
let schema = manifest_schema_v2(&partition_type)?;
let reader = AvroReader::with_schema(&schema, bs)?;
reader
.into_iter()
.map(|value| {
from_value::<_serde::ManifestEntryV2>(&value?)?
.try_into(&partition_type, &metadata.schema)
})
.collect::<Result<Vec<_>>>()?
}
};
Ok((metadata, entries))
}
pub fn parse_avro(bs: &[u8]) -> Result<Self> {
let (metadata, entries) = Self::try_from_avro_bytes(bs)?;
Ok(Self::new(metadata, entries))
}
pub fn entries(&self) -> &[ManifestEntryRef] {
&self.entries
}
pub fn into_parts(self) -> (Vec<ManifestEntryRef>, ManifestMetadata) {
let Self { entries, metadata } = self;
(entries, metadata)
}
pub fn new(metadata: ManifestMetadata, entries: Vec<ManifestEntry>) -> Self {
Self {
metadata,
entries: entries.into_iter().map(Arc::new).collect(),
}
}
}
pub struct ManifestWriterBuilder {
output: OutputFile,
snapshot_id: Option<i64>,
key_metadata: Vec<u8>,
schema: SchemaRef,
partition_spec: PartitionSpec,
}
impl ManifestWriterBuilder {
pub fn new(
output: OutputFile,
snapshot_id: Option<i64>,
key_metadata: Vec<u8>,
schema: SchemaRef,
partition_spec: PartitionSpec,
) -> Self {
Self {
output,
snapshot_id,
key_metadata,
schema,
partition_spec,
}
}
pub fn build_v1(self) -> ManifestWriter {
let metadata = ManifestMetadata::builder()
.schema_id(self.schema.schema_id())
.schema(self.schema)
.partition_spec(self.partition_spec)
.format_version(FormatVersion::V1)
.content(ManifestContentType::Data)
.build();
ManifestWriter::new(self.output, self.snapshot_id, self.key_metadata, metadata)
}
pub fn build_v2_data(self) -> ManifestWriter {
let metadata = ManifestMetadata::builder()
.schema_id(self.schema.schema_id())
.schema(self.schema)
.partition_spec(self.partition_spec)
.format_version(FormatVersion::V2)
.content(ManifestContentType::Data)
.build();
ManifestWriter::new(self.output, self.snapshot_id, self.key_metadata, metadata)
}
pub fn build_v2_deletes(self) -> ManifestWriter {
let metadata = ManifestMetadata::builder()
.schema_id(self.schema.schema_id())
.schema(self.schema)
.partition_spec(self.partition_spec)
.format_version(FormatVersion::V2)
.content(ManifestContentType::Deletes)
.build();
ManifestWriter::new(self.output, self.snapshot_id, self.key_metadata, metadata)
}
}
pub struct ManifestWriter {
output: OutputFile,
snapshot_id: Option<i64>,
added_files: u32,
added_rows: u64,
existing_files: u32,
existing_rows: u64,
deleted_files: u32,
deleted_rows: u64,
min_seq_num: Option<i64>,
key_metadata: Vec<u8>,
manifest_entries: Vec<ManifestEntry>,
metadata: ManifestMetadata,
}
struct PartitionFieldStats {
partition_type: PrimitiveType,
summary: FieldSummary,
}
impl PartitionFieldStats {
pub(crate) fn new(partition_type: PrimitiveType) -> Self {
Self {
partition_type,
summary: FieldSummary::default(),
}
}
pub(crate) fn update(&mut self, value: Option<PrimitiveLiteral>) -> Result<()> {
let Some(value) = value else {
self.summary.contains_null = true;
return Ok(());
};
if !self.partition_type.compatible(&value) {
return Err(Error::new(
ErrorKind::DataInvalid,
"value is not compatitable with type",
));
}
let value = Datum::new(self.partition_type.clone(), value);
if value.is_nan() {
self.summary.contains_nan = Some(true);
return Ok(());
}
self.summary.lower_bound = Some(self.summary.lower_bound.take().map_or(
value.clone(),
|original| {
if value < original {
value.clone()
} else {
original
}
},
));
self.summary.upper_bound = Some(self.summary.upper_bound.take().map_or(
value.clone(),
|original| {
if value > original {
value
} else {
original
}
},
));
Ok(())
}
pub(crate) fn finish(mut self) -> FieldSummary {
self.summary.contains_nan = self.summary.contains_nan.or(Some(false));
self.summary
}
}
impl ManifestWriter {
pub(crate) fn new(
output: OutputFile,
snapshot_id: Option<i64>,
key_metadata: Vec<u8>,
metadata: ManifestMetadata,
) -> Self {
Self {
output,
snapshot_id,
added_files: 0,
added_rows: 0,
existing_files: 0,
existing_rows: 0,
deleted_files: 0,
deleted_rows: 0,
min_seq_num: None,
key_metadata,
manifest_entries: Vec::new(),
metadata,
}
}
fn construct_partition_summaries(
&mut self,
partition_type: &StructType,
) -> Result<Vec<FieldSummary>> {
let mut field_stats: Vec<_> = partition_type
.fields()
.iter()
.map(|f| PartitionFieldStats::new(f.field_type.as_primitive_type().unwrap().clone()))
.collect();
for partition in self.manifest_entries.iter().map(|e| &e.data_file.partition) {
for (literal, stat) in partition.iter().zip_eq(field_stats.iter_mut()) {
let primitive_literal = literal.map(|v| v.as_primitive_literal().unwrap());
stat.update(primitive_literal)?;
}
}
Ok(field_stats.into_iter().map(|stat| stat.finish()).collect())
}
fn check_data_file(&self, data_file: &DataFile) -> Result<()> {
match self.metadata.content {
ManifestContentType::Data => {
if data_file.content != DataContentType::Data {
return Err(Error::new(
ErrorKind::DataInvalid,
format!(
"Content type of entry {:?} should have DataContentType::Data",
data_file.content
),
));
}
}
ManifestContentType::Deletes => {
if data_file.content != DataContentType::EqualityDeletes
&& data_file.content != DataContentType::PositionDeletes
{
return Err(Error::new(
ErrorKind::DataInvalid,
format!("Content type of entry {:?} should have DataContentType::EqualityDeletes or DataContentType::PositionDeletes", data_file.content),
));
}
}
}
Ok(())
}
pub(crate) fn add_entry(&mut self, mut entry: ManifestEntry) -> Result<()> {
self.check_data_file(&entry.data_file)?;
if entry.sequence_number().is_some_and(|n| n >= 0) {
entry.status = ManifestStatus::Added;
entry.snapshot_id = self.snapshot_id;
entry.file_sequence_number = None;
} else {
entry.status = ManifestStatus::Added;
entry.snapshot_id = self.snapshot_id;
entry.sequence_number = None;
entry.file_sequence_number = None;
};
self.add_entry_inner(entry)?;
Ok(())
}
pub fn add_file(&mut self, data_file: DataFile, sequence_number: i64) -> Result<()> {
self.check_data_file(&data_file)?;
let entry = ManifestEntry {
status: ManifestStatus::Added,
snapshot_id: self.snapshot_id,
sequence_number: (sequence_number >= 0).then_some(sequence_number),
file_sequence_number: None,
data_file,
};
self.add_entry_inner(entry)?;
Ok(())
}
#[allow(dead_code)]
pub(crate) fn add_delete_entry(&mut self, mut entry: ManifestEntry) -> Result<()> {
self.check_data_file(&entry.data_file)?;
entry.status = ManifestStatus::Deleted;
entry.snapshot_id = self.snapshot_id;
self.add_entry_inner(entry)?;
Ok(())
}
pub fn add_delete_file(
&mut self,
data_file: DataFile,
sequence_number: i64,
file_sequence_number: Option<i64>,
) -> Result<()> {
self.check_data_file(&data_file)?;
let entry = ManifestEntry {
status: ManifestStatus::Deleted,
snapshot_id: self.snapshot_id,
sequence_number: Some(sequence_number),
file_sequence_number,
data_file,
};
self.add_entry_inner(entry)?;
Ok(())
}
#[allow(dead_code)]
pub(crate) fn add_existing_entry(&mut self, mut entry: ManifestEntry) -> Result<()> {
self.check_data_file(&entry.data_file)?;
entry.status = ManifestStatus::Existing;
self.add_entry_inner(entry)?;
Ok(())
}
pub fn add_existing_file(
&mut self,
data_file: DataFile,
snapshot_id: i64,
sequence_number: i64,
file_sequence_number: Option<i64>,
) -> Result<()> {
self.check_data_file(&data_file)?;
let entry = ManifestEntry {
status: ManifestStatus::Existing,
snapshot_id: Some(snapshot_id),
sequence_number: Some(sequence_number),
file_sequence_number,
data_file,
};
self.add_entry_inner(entry)?;
Ok(())
}
fn add_entry_inner(&mut self, entry: ManifestEntry) -> Result<()> {
if (entry.status == ManifestStatus::Deleted || entry.status == ManifestStatus::Existing)
&& (entry.sequence_number.is_none() || entry.file_sequence_number.is_none())
{
return Err(Error::new(
ErrorKind::DataInvalid,
"Manifest entry with status Existing or Deleted should have sequence number",
));
}
match entry.status {
ManifestStatus::Added => {
self.added_files += 1;
self.added_rows += entry.data_file.record_count;
}
ManifestStatus::Deleted => {
self.deleted_files += 1;
self.deleted_rows += entry.data_file.record_count;
}
ManifestStatus::Existing => {
self.existing_files += 1;
self.existing_rows += entry.data_file.record_count;
}
}
if entry.is_alive() {
if let Some(seq_num) = entry.sequence_number {
self.min_seq_num = Some(self.min_seq_num.map_or(seq_num, |v| min(v, seq_num)));
}
}
self.manifest_entries.push(entry);
Ok(())
}
pub async fn write_manifest_file(mut self) -> Result<ManifestFile> {
let partition_type = self
.metadata
.partition_spec
.partition_type(&self.metadata.schema)?;
let table_schema = &self.metadata.schema;
let avro_schema = match self.metadata.format_version {
FormatVersion::V1 => manifest_schema_v1(&partition_type)?,
FormatVersion::V2 => manifest_schema_v2(&partition_type)?,
};
let mut avro_writer = AvroWriter::new(&avro_schema, Vec::new());
avro_writer.add_user_metadata(
"schema".to_string(),
to_vec(table_schema).map_err(|err| {
Error::new(ErrorKind::DataInvalid, "Fail to serialize table schema")
.with_source(err)
})?,
)?;
avro_writer.add_user_metadata(
"schema-id".to_string(),
table_schema.schema_id().to_string(),
)?;
avro_writer.add_user_metadata(
"partition-spec".to_string(),
to_vec(&self.metadata.partition_spec.fields()).map_err(|err| {
Error::new(ErrorKind::DataInvalid, "Fail to serialize partition spec")
.with_source(err)
})?,
)?;
avro_writer.add_user_metadata(
"partition-spec-id".to_string(),
self.metadata.partition_spec.spec_id().to_string(),
)?;
avro_writer.add_user_metadata(
"format-version".to_string(),
(self.metadata.format_version as u8).to_string(),
)?;
if self.metadata.format_version == FormatVersion::V2 {
avro_writer
.add_user_metadata("content".to_string(), self.metadata.content.to_string())?;
}
let partition_summary = self.construct_partition_summaries(&partition_type)?;
for entry in std::mem::take(&mut self.manifest_entries) {
let value = match self.metadata.format_version {
FormatVersion::V1 => {
to_value(_serde::ManifestEntryV1::try_from(entry, &partition_type)?)?
.resolve(&avro_schema)?
}
FormatVersion::V2 => {
to_value(_serde::ManifestEntryV2::try_from(entry, &partition_type)?)?
.resolve(&avro_schema)?
}
};
avro_writer.append(value)?;
}
let content = avro_writer.into_inner()?;
let length = content.len();
self.output.write(Bytes::from(content)).await?;
Ok(ManifestFile {
manifest_path: self.output.location().to_string(),
manifest_length: length as i64,
partition_spec_id: self.metadata.partition_spec.spec_id(),
content: self.metadata.content,
sequence_number: UNASSIGNED_SEQUENCE_NUMBER,
min_sequence_number: self.min_seq_num.unwrap_or(UNASSIGNED_SEQUENCE_NUMBER),
added_snapshot_id: self.snapshot_id.unwrap_or(UNASSIGNED_SNAPSHOT_ID),
added_files_count: Some(self.added_files),
existing_files_count: Some(self.existing_files),
deleted_files_count: Some(self.deleted_files),
added_rows_count: Some(self.added_rows),
existing_rows_count: Some(self.existing_rows),
deleted_rows_count: Some(self.deleted_rows),
partitions: partition_summary,
key_metadata: self.key_metadata,
})
}
}
mod _const_schema {
use std::sync::Arc;
use apache_avro::Schema as AvroSchema;
use once_cell::sync::Lazy;
use crate::avro::schema_to_avro_schema;
use crate::spec::{
ListType, MapType, NestedField, NestedFieldRef, PrimitiveType, Schema, StructType, Type,
};
use crate::Error;
static STATUS: Lazy<NestedFieldRef> = {
Lazy::new(|| {
Arc::new(NestedField::required(
0,
"status",
Type::Primitive(PrimitiveType::Int),
))
})
};
static SNAPSHOT_ID_V1: Lazy<NestedFieldRef> = {
Lazy::new(|| {
Arc::new(NestedField::required(
1,
"snapshot_id",
Type::Primitive(PrimitiveType::Long),
))
})
};
static SNAPSHOT_ID_V2: Lazy<NestedFieldRef> = {
Lazy::new(|| {
Arc::new(NestedField::optional(
1,
"snapshot_id",
Type::Primitive(PrimitiveType::Long),
))
})
};
static SEQUENCE_NUMBER: Lazy<NestedFieldRef> = {
Lazy::new(|| {
Arc::new(NestedField::optional(
3,
"sequence_number",
Type::Primitive(PrimitiveType::Long),
))
})
};
static FILE_SEQUENCE_NUMBER: Lazy<NestedFieldRef> = {
Lazy::new(|| {
Arc::new(NestedField::optional(
4,
"file_sequence_number",
Type::Primitive(PrimitiveType::Long),
))
})
};
static CONTENT: Lazy<NestedFieldRef> = {
Lazy::new(|| {
Arc::new(NestedField::required(
134,
"content",
Type::Primitive(PrimitiveType::Int),
))
})
};
static FILE_PATH: Lazy<NestedFieldRef> = {
Lazy::new(|| {
Arc::new(NestedField::required(
100,
"file_path",
Type::Primitive(PrimitiveType::String),
))
})
};
static FILE_FORMAT: Lazy<NestedFieldRef> = {
Lazy::new(|| {
Arc::new(NestedField::required(
101,
"file_format",
Type::Primitive(PrimitiveType::String),
))
})
};
static RECORD_COUNT: Lazy<NestedFieldRef> = {
Lazy::new(|| {
Arc::new(NestedField::required(
103,
"record_count",
Type::Primitive(PrimitiveType::Long),
))
})
};
static FILE_SIZE_IN_BYTES: Lazy<NestedFieldRef> = {
Lazy::new(|| {
Arc::new(NestedField::required(
104,
"file_size_in_bytes",
Type::Primitive(PrimitiveType::Long),
))
})
};
static BLOCK_SIZE_IN_BYTES: Lazy<NestedFieldRef> = {
Lazy::new(|| {
Arc::new(NestedField::required(
105,
"block_size_in_bytes",
Type::Primitive(PrimitiveType::Long),
))
})
};
static COLUMN_SIZES: Lazy<NestedFieldRef> = {
Lazy::new(|| {
Arc::new(NestedField::optional(
108,
"column_sizes",
Type::Map(MapType {
key_field: Arc::new(NestedField::required(
117,
"key",
Type::Primitive(PrimitiveType::Int),
)),
value_field: Arc::new(NestedField::required(
118,
"value",
Type::Primitive(PrimitiveType::Long),
)),
}),
))
})
};
static VALUE_COUNTS: Lazy<NestedFieldRef> = {
Lazy::new(|| {
Arc::new(NestedField::optional(
109,
"value_counts",
Type::Map(MapType {
key_field: Arc::new(NestedField::required(
119,
"key",
Type::Primitive(PrimitiveType::Int),
)),
value_field: Arc::new(NestedField::required(
120,
"value",
Type::Primitive(PrimitiveType::Long),
)),
}),
))
})
};
static NULL_VALUE_COUNTS: Lazy<NestedFieldRef> = {
Lazy::new(|| {
Arc::new(NestedField::optional(
110,
"null_value_counts",
Type::Map(MapType {
key_field: Arc::new(NestedField::required(
121,
"key",
Type::Primitive(PrimitiveType::Int),
)),
value_field: Arc::new(NestedField::required(
122,
"value",
Type::Primitive(PrimitiveType::Long),
)),
}),
))
})
};
static NAN_VALUE_COUNTS: Lazy<NestedFieldRef> = {
Lazy::new(|| {
Arc::new(NestedField::optional(
137,
"nan_value_counts",
Type::Map(MapType {
key_field: Arc::new(NestedField::required(
138,
"key",
Type::Primitive(PrimitiveType::Int),
)),
value_field: Arc::new(NestedField::required(
139,
"value",
Type::Primitive(PrimitiveType::Long),
)),
}),
))
})
};
static LOWER_BOUNDS: Lazy<NestedFieldRef> = {
Lazy::new(|| {
Arc::new(NestedField::optional(
125,
"lower_bounds",
Type::Map(MapType {
key_field: Arc::new(NestedField::required(
126,
"key",
Type::Primitive(PrimitiveType::Int),
)),
value_field: Arc::new(NestedField::required(
127,
"value",
Type::Primitive(PrimitiveType::Binary),
)),
}),
))
})
};
static UPPER_BOUNDS: Lazy<NestedFieldRef> = {
Lazy::new(|| {
Arc::new(NestedField::optional(
128,
"upper_bounds",
Type::Map(MapType {
key_field: Arc::new(NestedField::required(
129,
"key",
Type::Primitive(PrimitiveType::Int),
)),
value_field: Arc::new(NestedField::required(
130,
"value",
Type::Primitive(PrimitiveType::Binary),
)),
}),
))
})
};
static KEY_METADATA: Lazy<NestedFieldRef> = {
Lazy::new(|| {
Arc::new(NestedField::optional(
131,
"key_metadata",
Type::Primitive(PrimitiveType::Binary),
))
})
};
static SPLIT_OFFSETS: Lazy<NestedFieldRef> = {
Lazy::new(|| {
Arc::new(NestedField::optional(
132,
"split_offsets",
Type::List(ListType {
element_field: Arc::new(NestedField::required(
133,
"element",
Type::Primitive(PrimitiveType::Long),
)),
}),
))
})
};
static EQUALITY_IDS: Lazy<NestedFieldRef> = {
Lazy::new(|| {
Arc::new(NestedField::optional(
135,
"equality_ids",
Type::List(ListType {
element_field: Arc::new(NestedField::required(
136,
"element",
Type::Primitive(PrimitiveType::Int),
)),
}),
))
})
};
static SORT_ORDER_ID: Lazy<NestedFieldRef> = {
Lazy::new(|| {
Arc::new(NestedField::optional(
140,
"sort_order_id",
Type::Primitive(PrimitiveType::Int),
))
})
};
fn data_file_fields_v2(partition_type: &StructType) -> Vec<NestedFieldRef> {
vec![
CONTENT.clone(),
FILE_PATH.clone(),
FILE_FORMAT.clone(),
Arc::new(NestedField::required(
102,
"partition",
Type::Struct(partition_type.clone()),
)),
RECORD_COUNT.clone(),
FILE_SIZE_IN_BYTES.clone(),
COLUMN_SIZES.clone(),
VALUE_COUNTS.clone(),
NULL_VALUE_COUNTS.clone(),
NAN_VALUE_COUNTS.clone(),
LOWER_BOUNDS.clone(),
UPPER_BOUNDS.clone(),
KEY_METADATA.clone(),
SPLIT_OFFSETS.clone(),
EQUALITY_IDS.clone(),
SORT_ORDER_ID.clone(),
]
}
pub(super) fn data_file_schema_v2(partition_type: &StructType) -> Result<AvroSchema, Error> {
let schema = Schema::builder()
.with_fields(data_file_fields_v2(partition_type))
.build()?;
schema_to_avro_schema("data_file", &schema)
}
pub(super) fn manifest_schema_v2(partition_type: &StructType) -> Result<AvroSchema, Error> {
let fields = vec![
STATUS.clone(),
SNAPSHOT_ID_V2.clone(),
SEQUENCE_NUMBER.clone(),
FILE_SEQUENCE_NUMBER.clone(),
Arc::new(NestedField::required(
2,
"data_file",
Type::Struct(StructType::new(data_file_fields_v2(partition_type))),
)),
];
let schema = Schema::builder().with_fields(fields).build()?;
schema_to_avro_schema("manifest_entry", &schema)
}
fn data_file_fields_v1(partition_type: &StructType) -> Vec<NestedFieldRef> {
vec![
FILE_PATH.clone(),
FILE_FORMAT.clone(),
Arc::new(NestedField::required(
102,
"partition",
Type::Struct(partition_type.clone()),
)),
RECORD_COUNT.clone(),
FILE_SIZE_IN_BYTES.clone(),
BLOCK_SIZE_IN_BYTES.clone(),
COLUMN_SIZES.clone(),
VALUE_COUNTS.clone(),
NULL_VALUE_COUNTS.clone(),
NAN_VALUE_COUNTS.clone(),
LOWER_BOUNDS.clone(),
UPPER_BOUNDS.clone(),
KEY_METADATA.clone(),
SPLIT_OFFSETS.clone(),
SORT_ORDER_ID.clone(),
]
}
pub(super) fn data_file_schema_v1(partition_type: &StructType) -> Result<AvroSchema, Error> {
let schema = Schema::builder()
.with_fields(data_file_fields_v1(partition_type))
.build()?;
schema_to_avro_schema("data_file", &schema)
}
pub(super) fn manifest_schema_v1(partition_type: &StructType) -> Result<AvroSchema, Error> {
let fields = vec![
STATUS.clone(),
SNAPSHOT_ID_V1.clone(),
Arc::new(NestedField::required(
2,
"data_file",
Type::Struct(StructType::new(data_file_fields_v1(partition_type))),
)),
];
let schema = Schema::builder().with_fields(fields).build()?;
schema_to_avro_schema("manifest_entry", &schema)
}
}
#[derive(Debug, PartialEq, Clone, Eq, TypedBuilder)]
pub struct ManifestMetadata {
schema: SchemaRef,
schema_id: SchemaId,
partition_spec: PartitionSpec,
format_version: FormatVersion,
content: ManifestContentType,
}
impl ManifestMetadata {
pub fn parse(meta: &HashMap<String, Vec<u8>>) -> Result<Self> {
let schema = Arc::new({
let bs = meta.get("schema").ok_or_else(|| {
Error::new(
ErrorKind::DataInvalid,
"schema is required in manifest metadata but not found",
)
})?;
serde_json::from_slice::<Schema>(bs).map_err(|err| {
Error::new(
ErrorKind::DataInvalid,
"Fail to parse schema in manifest metadata",
)
.with_source(err)
})?
});
let schema_id: i32 = meta
.get("schema-id")
.map(|bs| {
String::from_utf8_lossy(bs).parse().map_err(|err| {
Error::new(
ErrorKind::DataInvalid,
"Fail to parse schema id in manifest metadata",
)
.with_source(err)
})
})
.transpose()?
.unwrap_or(0);
let partition_spec = {
let fields = {
let bs = meta.get("partition-spec").ok_or_else(|| {
Error::new(
ErrorKind::DataInvalid,
"partition-spec is required in manifest metadata but not found",
)
})?;
serde_json::from_slice::<Vec<PartitionField>>(bs).map_err(|err| {
Error::new(
ErrorKind::DataInvalid,
"Fail to parse partition spec in manifest metadata",
)
.with_source(err)
})?
};
let spec_id = meta
.get("partition-spec-id")
.map(|bs| {
String::from_utf8_lossy(bs).parse().map_err(|err| {
Error::new(
ErrorKind::DataInvalid,
"Fail to parse partition spec id in manifest metadata",
)
.with_source(err)
})
})
.transpose()?
.unwrap_or(0);
PartitionSpec::builder(schema.clone())
.with_spec_id(spec_id)
.add_unbound_fields(fields.into_iter().map(|f| f.into_unbound()))?
.build()?
};
let format_version = if let Some(bs) = meta.get("format-version") {
serde_json::from_slice::<FormatVersion>(bs).map_err(|err| {
Error::new(
ErrorKind::DataInvalid,
"Fail to parse format version in manifest metadata",
)
.with_source(err)
})?
} else {
FormatVersion::V1
};
let content = if let Some(v) = meta.get("content") {
let v = String::from_utf8_lossy(v);
v.parse()?
} else {
ManifestContentType::Data
};
Ok(ManifestMetadata {
schema,
schema_id,
partition_spec,
format_version,
content,
})
}
}
pub type ManifestEntryRef = Arc<ManifestEntry>;
#[derive(Debug, PartialEq, Eq, Clone, TypedBuilder)]
pub struct ManifestEntry {
status: ManifestStatus,
#[builder(default, setter(strip_option(fallback = snapshot_id_opt)))]
snapshot_id: Option<i64>,
#[builder(default, setter(strip_option(fallback = sequence_number_opt)))]
sequence_number: Option<i64>,
#[builder(default, setter(strip_option(fallback = file_sequence_number_opt)))]
file_sequence_number: Option<i64>,
data_file: DataFile,
}
impl ManifestEntry {
pub fn is_alive(&self) -> bool {
matches!(
self.status,
ManifestStatus::Added | ManifestStatus::Existing
)
}
pub fn status(&self) -> ManifestStatus {
self.status
}
#[inline]
pub fn content_type(&self) -> DataContentType {
self.data_file.content
}
#[inline]
pub fn file_format(&self) -> DataFileFormat {
self.data_file.file_format
}
#[inline]
pub fn file_path(&self) -> &str {
&self.data_file.file_path
}
#[inline]
pub fn record_count(&self) -> u64 {
self.data_file.record_count
}
pub(crate) fn inherit_data(&mut self, snapshot_entry: &ManifestFile) {
if self.snapshot_id.is_none() {
self.snapshot_id = Some(snapshot_entry.added_snapshot_id);
}
if self.sequence_number.is_none()
&& (self.status == ManifestStatus::Added
|| snapshot_entry.sequence_number == INITIAL_SEQUENCE_NUMBER)
{
self.sequence_number = Some(snapshot_entry.sequence_number);
}
if self.file_sequence_number.is_none()
&& (self.status == ManifestStatus::Added
|| snapshot_entry.sequence_number == INITIAL_SEQUENCE_NUMBER)
{
self.file_sequence_number = Some(snapshot_entry.sequence_number);
}
}
#[inline]
pub fn snapshot_id(&self) -> Option<i64> {
self.snapshot_id
}
#[inline]
pub fn sequence_number(&self) -> Option<i64> {
self.sequence_number
}
#[inline]
pub fn file_size_in_bytes(&self) -> u64 {
self.data_file.file_size_in_bytes
}
#[inline]
pub fn data_file(&self) -> &DataFile {
&self.data_file
}
}
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
pub enum ManifestStatus {
Existing = 0,
Added = 1,
Deleted = 2,
}
impl TryFrom<i32> for ManifestStatus {
type Error = Error;
fn try_from(v: i32) -> Result<ManifestStatus> {
match v {
0 => Ok(ManifestStatus::Existing),
1 => Ok(ManifestStatus::Added),
2 => Ok(ManifestStatus::Deleted),
_ => Err(Error::new(
ErrorKind::DataInvalid,
format!("manifest status {} is invalid", v),
)),
}
}
}
#[derive(Debug, PartialEq, Clone, Eq, Builder)]
pub struct DataFile {
pub(crate) content: DataContentType,
pub(crate) file_path: String,
pub(crate) file_format: DataFileFormat,
pub(crate) partition: Struct,
pub(crate) record_count: u64,
pub(crate) file_size_in_bytes: u64,
#[builder(default)]
pub(crate) column_sizes: HashMap<i32, u64>,
#[builder(default)]
pub(crate) value_counts: HashMap<i32, u64>,
#[builder(default)]
pub(crate) null_value_counts: HashMap<i32, u64>,
#[builder(default)]
pub(crate) nan_value_counts: HashMap<i32, u64>,
#[builder(default)]
pub(crate) lower_bounds: HashMap<i32, Datum>,
#[builder(default)]
pub(crate) upper_bounds: HashMap<i32, Datum>,
#[builder(default)]
pub(crate) key_metadata: Option<Vec<u8>>,
#[builder(default)]
pub(crate) split_offsets: Vec<i64>,
#[builder(default)]
pub(crate) equality_ids: Vec<i32>,
#[builder(default, setter(strip_option))]
pub(crate) sort_order_id: Option<i32>,
}
impl DataFile {
pub fn content_type(&self) -> DataContentType {
self.content
}
pub fn file_path(&self) -> &str {
&self.file_path
}
pub fn file_format(&self) -> DataFileFormat {
self.file_format
}
pub fn partition(&self) -> &Struct {
&self.partition
}
pub fn record_count(&self) -> u64 {
self.record_count
}
pub fn file_size_in_bytes(&self) -> u64 {
self.file_size_in_bytes
}
pub fn column_sizes(&self) -> &HashMap<i32, u64> {
&self.column_sizes
}
pub fn value_counts(&self) -> &HashMap<i32, u64> {
&self.value_counts
}
pub fn null_value_counts(&self) -> &HashMap<i32, u64> {
&self.null_value_counts
}
pub fn nan_value_counts(&self) -> &HashMap<i32, u64> {
&self.nan_value_counts
}
pub fn lower_bounds(&self) -> &HashMap<i32, Datum> {
&self.lower_bounds
}
pub fn upper_bounds(&self) -> &HashMap<i32, Datum> {
&self.upper_bounds
}
pub fn key_metadata(&self) -> Option<&[u8]> {
self.key_metadata.as_deref()
}
pub fn split_offsets(&self) -> &[i64] {
&self.split_offsets
}
pub fn equality_ids(&self) -> &[i32] {
&self.equality_ids
}
pub fn sort_order_id(&self) -> Option<i32> {
self.sort_order_id
}
}
pub fn write_data_files_to_avro<W: Write>(
writer: &mut W,
data_files: impl IntoIterator<Item = DataFile>,
partition_type: &StructType,
version: FormatVersion,
) -> Result<usize> {
let avro_schema = match version {
FormatVersion::V1 => _const_schema::data_file_schema_v1(partition_type).unwrap(),
FormatVersion::V2 => _const_schema::data_file_schema_v2(partition_type).unwrap(),
};
let mut writer = AvroWriter::new(&avro_schema, writer);
for data_file in data_files {
let value = to_value(_serde::DataFile::try_from(data_file, partition_type, true)?)?
.resolve(&avro_schema)?;
writer.append(value)?;
}
Ok(writer.flush()?)
}
pub fn read_data_files_from_avro<R: Read>(
reader: &mut R,
schema: &Schema,
partition_type: &StructType,
version: FormatVersion,
) -> Result<Vec<DataFile>> {
let avro_schema = match version {
FormatVersion::V1 => _const_schema::data_file_schema_v1(partition_type).unwrap(),
FormatVersion::V2 => _const_schema::data_file_schema_v2(partition_type).unwrap(),
};
let reader = AvroReader::with_schema(&avro_schema, reader)?;
reader
.into_iter()
.map(|value| from_value::<_serde::DataFile>(&value?)?.try_into(partition_type, schema))
.collect::<Result<Vec<_>>>()
}
#[derive(Debug, PartialEq, Eq, Clone, Copy, Serialize, Deserialize)]
pub enum DataContentType {
Data = 0,
PositionDeletes = 1,
EqualityDeletes = 2,
}
impl TryFrom<i32> for DataContentType {
type Error = Error;
fn try_from(v: i32) -> Result<DataContentType> {
match v {
0 => Ok(DataContentType::Data),
1 => Ok(DataContentType::PositionDeletes),
2 => Ok(DataContentType::EqualityDeletes),
_ => Err(Error::new(
ErrorKind::DataInvalid,
format!("data content type {} is invalid", v),
)),
}
}
}
#[derive(Debug, PartialEq, Eq, Clone, Copy, SerializeDisplay, DeserializeFromStr)]
pub enum DataFileFormat {
Avro,
Orc,
Parquet,
}
impl FromStr for DataFileFormat {
type Err = Error;
fn from_str(s: &str) -> Result<Self> {
match s.to_lowercase().as_str() {
"avro" => Ok(Self::Avro),
"orc" => Ok(Self::Orc),
"parquet" => Ok(Self::Parquet),
_ => Err(Error::new(
ErrorKind::DataInvalid,
format!("Unsupported data file format: {}", s),
)),
}
}
}
impl std::fmt::Display for DataFileFormat {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
DataFileFormat::Avro => write!(f, "avro"),
DataFileFormat::Orc => write!(f, "orc"),
DataFileFormat::Parquet => write!(f, "parquet"),
}
}
}
mod _serde {
use std::collections::HashMap;
use serde_derive::{Deserialize, Serialize};
use serde_with::serde_as;
use super::ManifestEntry;
use crate::spec::{Datum, Literal, RawLiteral, Schema, Struct, StructType, Type};
use crate::{Error, ErrorKind};
#[derive(Serialize, Deserialize)]
pub(super) struct ManifestEntryV2 {
status: i32,
snapshot_id: Option<i64>,
sequence_number: Option<i64>,
file_sequence_number: Option<i64>,
data_file: DataFile,
}
impl ManifestEntryV2 {
pub fn try_from(value: ManifestEntry, partition_type: &StructType) -> Result<Self, Error> {
Ok(Self {
status: value.status as i32,
snapshot_id: value.snapshot_id,
sequence_number: value.sequence_number,
file_sequence_number: value.file_sequence_number,
data_file: DataFile::try_from(value.data_file, partition_type, false)?,
})
}
pub fn try_into(
self,
partition_type: &StructType,
schema: &Schema,
) -> Result<ManifestEntry, Error> {
Ok(ManifestEntry {
status: self.status.try_into()?,
snapshot_id: self.snapshot_id,
sequence_number: self.sequence_number,
file_sequence_number: self.file_sequence_number,
data_file: self.data_file.try_into(partition_type, schema)?,
})
}
}
#[derive(Serialize, Deserialize)]
pub(super) struct ManifestEntryV1 {
status: i32,
pub snapshot_id: i64,
data_file: DataFile,
}
impl ManifestEntryV1 {
pub fn try_from(value: ManifestEntry, partition_type: &StructType) -> Result<Self, Error> {
Ok(Self {
status: value.status as i32,
snapshot_id: value.snapshot_id.unwrap_or_default(),
data_file: DataFile::try_from(value.data_file, partition_type, true)?,
})
}
pub fn try_into(
self,
partition_type: &StructType,
schema: &Schema,
) -> Result<ManifestEntry, Error> {
Ok(ManifestEntry {
status: self.status.try_into()?,
snapshot_id: Some(self.snapshot_id),
sequence_number: Some(0),
file_sequence_number: Some(0),
data_file: self.data_file.try_into(partition_type, schema)?,
})
}
}
#[serde_as]
#[derive(Serialize, Deserialize)]
pub(super) struct DataFile {
#[serde(default)]
content: i32,
file_path: String,
file_format: String,
partition: RawLiteral,
record_count: i64,
file_size_in_bytes: i64,
#[serde(skip_deserializing, skip_serializing_if = "Option::is_none")]
block_size_in_bytes: Option<i64>,
column_sizes: Option<Vec<I64Entry>>,
value_counts: Option<Vec<I64Entry>>,
null_value_counts: Option<Vec<I64Entry>>,
nan_value_counts: Option<Vec<I64Entry>>,
lower_bounds: Option<Vec<BytesEntry>>,
upper_bounds: Option<Vec<BytesEntry>>,
key_metadata: Option<serde_bytes::ByteBuf>,
split_offsets: Option<Vec<i64>>,
#[serde(default)]
equality_ids: Option<Vec<i32>>,
sort_order_id: Option<i32>,
}
impl DataFile {
pub fn try_from(
value: super::DataFile,
partition_type: &StructType,
is_version_1: bool,
) -> Result<Self, Error> {
let block_size_in_bytes = if is_version_1 { Some(0) } else { None };
Ok(Self {
content: value.content as i32,
file_path: value.file_path,
file_format: value.file_format.to_string().to_ascii_uppercase(),
partition: RawLiteral::try_from(
Literal::Struct(value.partition),
&Type::Struct(partition_type.clone()),
)?,
record_count: value.record_count.try_into()?,
file_size_in_bytes: value.file_size_in_bytes.try_into()?,
block_size_in_bytes,
column_sizes: Some(to_i64_entry(value.column_sizes)?),
value_counts: Some(to_i64_entry(value.value_counts)?),
null_value_counts: Some(to_i64_entry(value.null_value_counts)?),
nan_value_counts: Some(to_i64_entry(value.nan_value_counts)?),
lower_bounds: Some(to_bytes_entry(value.lower_bounds)?),
upper_bounds: Some(to_bytes_entry(value.upper_bounds)?),
key_metadata: value.key_metadata.map(serde_bytes::ByteBuf::from),
split_offsets: Some(value.split_offsets),
equality_ids: Some(value.equality_ids),
sort_order_id: value.sort_order_id,
})
}
pub fn try_into(
self,
partition_type: &StructType,
schema: &Schema,
) -> Result<super::DataFile, Error> {
let partition = self
.partition
.try_into(&Type::Struct(partition_type.clone()))?
.map(|v| {
if let Literal::Struct(v) = v {
Ok(v)
} else {
Err(Error::new(
ErrorKind::DataInvalid,
"partition value is not a struct",
))
}
})
.transpose()?
.unwrap_or(Struct::empty());
Ok(super::DataFile {
content: self.content.try_into()?,
file_path: self.file_path,
file_format: self.file_format.parse()?,
partition,
record_count: self.record_count.try_into()?,
file_size_in_bytes: self.file_size_in_bytes.try_into()?,
column_sizes: self
.column_sizes
.map(parse_i64_entry)
.transpose()?
.unwrap_or_default(),
value_counts: self
.value_counts
.map(parse_i64_entry)
.transpose()?
.unwrap_or_default(),
null_value_counts: self
.null_value_counts
.map(parse_i64_entry)
.transpose()?
.unwrap_or_default(),
nan_value_counts: self
.nan_value_counts
.map(parse_i64_entry)
.transpose()?
.unwrap_or_default(),
lower_bounds: self
.lower_bounds
.map(|v| parse_bytes_entry(v, schema))
.transpose()?
.unwrap_or_default(),
upper_bounds: self
.upper_bounds
.map(|v| parse_bytes_entry(v, schema))
.transpose()?
.unwrap_or_default(),
key_metadata: self.key_metadata.map(|v| v.to_vec()),
split_offsets: self.split_offsets.unwrap_or_default(),
equality_ids: self.equality_ids.unwrap_or_default(),
sort_order_id: self.sort_order_id,
})
}
}
#[serde_as]
#[derive(Serialize, Deserialize)]
#[cfg_attr(test, derive(Debug, PartialEq, Eq))]
struct BytesEntry {
key: i32,
value: serde_bytes::ByteBuf,
}
fn parse_bytes_entry(
v: Vec<BytesEntry>,
schema: &Schema,
) -> Result<HashMap<i32, Datum>, Error> {
let mut m = HashMap::with_capacity(v.len());
for entry in v {
if let Some(field) = schema.field_by_id(entry.key) {
let data_type = field
.field_type
.as_primitive_type()
.ok_or_else(|| {
Error::new(
ErrorKind::DataInvalid,
format!("field {} is not a primitive type", field.name),
)
})?
.clone();
m.insert(entry.key, Datum::try_from_bytes(&entry.value, data_type)?);
}
}
Ok(m)
}
fn to_bytes_entry(v: impl IntoIterator<Item = (i32, Datum)>) -> Result<Vec<BytesEntry>, Error> {
let iter = v.into_iter();
let mut bs = Vec::with_capacity(iter.size_hint().0);
for (k, d) in iter {
bs.push(BytesEntry {
key: k,
value: d.to_bytes()?,
});
}
Ok(bs)
}
#[derive(Serialize, Deserialize)]
#[cfg_attr(test, derive(Debug, PartialEq, Eq))]
struct I64Entry {
key: i32,
value: i64,
}
fn parse_i64_entry(v: Vec<I64Entry>) -> Result<HashMap<i32, u64>, Error> {
let mut m = HashMap::with_capacity(v.len());
for entry in v {
if let Ok(v) = entry.value.try_into() {
m.insert(entry.key, v);
}
}
Ok(m)
}
fn to_i64_entry(entries: HashMap<i32, u64>) -> Result<Vec<I64Entry>, Error> {
entries
.iter()
.map(|e| {
Ok(I64Entry {
key: *e.0,
value: (*e.1).try_into()?,
})
})
.collect()
}
#[cfg(test)]
mod tests {
use std::collections::HashMap;
use crate::spec::manifest::_serde::{parse_i64_entry, I64Entry};
#[test]
fn test_parse_negative_manifest_entry() {
let entries = vec![I64Entry { key: 1, value: -1 }, I64Entry {
key: 2,
value: 3,
}];
let ret = parse_i64_entry(entries).unwrap();
let expected_ret = HashMap::from([(2, 3)]);
assert_eq!(ret, expected_ret, "Negative i64 entry should be ignored!");
}
}
}
#[cfg(test)]
mod tests {
use std::fs;
use std::io::Cursor;
use std::sync::Arc;
use tempfile::TempDir;
use super::*;
use crate::io::FileIOBuilder;
use crate::spec::{Literal, NestedField, PrimitiveType, Struct, Transform, Type};
#[tokio::test]
async fn test_parse_manifest_v2_unpartition() {
let schema = Arc::new(
Schema::builder()
.with_fields(vec![
Arc::new(NestedField::optional(
1,
"id",
Type::Primitive(PrimitiveType::Long),
)),
Arc::new(NestedField::optional(
2,
"v_int",
Type::Primitive(PrimitiveType::Int),
)),
Arc::new(NestedField::optional(
3,
"v_long",
Type::Primitive(PrimitiveType::Long),
)),
Arc::new(NestedField::optional(
4,
"v_float",
Type::Primitive(PrimitiveType::Float),
)),
Arc::new(NestedField::optional(
5,
"v_double",
Type::Primitive(PrimitiveType::Double),
)),
Arc::new(NestedField::optional(
6,
"v_varchar",
Type::Primitive(PrimitiveType::String),
)),
Arc::new(NestedField::optional(
7,
"v_bool",
Type::Primitive(PrimitiveType::Boolean),
)),
Arc::new(NestedField::optional(
8,
"v_date",
Type::Primitive(PrimitiveType::Date),
)),
Arc::new(NestedField::optional(
9,
"v_timestamp",
Type::Primitive(PrimitiveType::Timestamptz),
)),
Arc::new(NestedField::optional(
10,
"v_decimal",
Type::Primitive(PrimitiveType::Decimal {
precision: 36,
scale: 10,
}),
)),
Arc::new(NestedField::optional(
11,
"v_ts_ntz",
Type::Primitive(PrimitiveType::Timestamp),
)),
Arc::new(NestedField::optional(
12,
"v_ts_ns_ntz",
Type::Primitive(PrimitiveType::TimestampNs),
)),
])
.build()
.unwrap(),
);
let metadata = ManifestMetadata {
schema_id: 0,
schema: schema.clone(),
partition_spec: PartitionSpec::builder(schema)
.with_spec_id(0)
.build()
.unwrap(),
content: ManifestContentType::Data,
format_version: FormatVersion::V2,
};
let mut entries = vec![
ManifestEntry {
status: ManifestStatus::Added,
snapshot_id: None,
sequence_number: None,
file_sequence_number: None,
data_file: DataFile {
content: DataContentType::Data,
file_path: "s3a://icebergdata/demo/s1/t1/data/00000-0-ba56fbfa-f2ff-40c9-bb27-565ad6dc2be8-00000.parquet".to_string(),
file_format: DataFileFormat::Parquet,
partition: Struct::empty(),
record_count: 1,
file_size_in_bytes: 5442,
column_sizes: HashMap::from([(0,73),(6,34),(2,73),(7,61),(3,61),(5,62),(9,79),(10,73),(1,61),(4,73),(8,73)]),
value_counts: HashMap::from([(4,1),(5,1),(2,1),(0,1),(3,1),(6,1),(8,1),(1,1),(10,1),(7,1),(9,1)]),
null_value_counts: HashMap::from([(1,0),(6,0),(2,0),(8,0),(0,0),(3,0),(5,0),(9,0),(7,0),(4,0),(10,0)]),
nan_value_counts: HashMap::new(),
lower_bounds: HashMap::new(),
upper_bounds: HashMap::new(),
key_metadata: None,
split_offsets: vec![4],
equality_ids: Vec::new(),
sort_order_id: None,
}
}
];
let tmp_dir = TempDir::new().unwrap();
let path = tmp_dir.path().join("test_manifest.avro");
let io = FileIOBuilder::new_fs_io().build().unwrap();
let output_file = io.new_output(path.to_str().unwrap()).unwrap();
let mut writer = ManifestWriterBuilder::new(
output_file,
Some(1),
vec![],
metadata.schema.clone(),
metadata.partition_spec.clone(),
)
.build_v2_data();
for entry in &entries {
writer.add_entry(entry.clone()).unwrap();
}
writer.write_manifest_file().await.unwrap();
let actual_manifest =
Manifest::parse_avro(fs::read(path).expect("read_file must succeed").as_slice())
.unwrap();
entries[0].snapshot_id = Some(1);
assert_eq!(actual_manifest, Manifest::new(metadata, entries));
}
#[tokio::test]
async fn test_parse_manifest_v2_partition() {
let schema = Arc::new(
Schema::builder()
.with_fields(vec![
Arc::new(NestedField::optional(
1,
"id",
Type::Primitive(PrimitiveType::Long),
)),
Arc::new(NestedField::optional(
2,
"v_int",
Type::Primitive(PrimitiveType::Int),
)),
Arc::new(NestedField::optional(
3,
"v_long",
Type::Primitive(PrimitiveType::Long),
)),
Arc::new(NestedField::optional(
4,
"v_float",
Type::Primitive(PrimitiveType::Float),
)),
Arc::new(NestedField::optional(
5,
"v_double",
Type::Primitive(PrimitiveType::Double),
)),
Arc::new(NestedField::optional(
6,
"v_varchar",
Type::Primitive(PrimitiveType::String),
)),
Arc::new(NestedField::optional(
7,
"v_bool",
Type::Primitive(PrimitiveType::Boolean),
)),
Arc::new(NestedField::optional(
8,
"v_date",
Type::Primitive(PrimitiveType::Date),
)),
Arc::new(NestedField::optional(
9,
"v_timestamp",
Type::Primitive(PrimitiveType::Timestamptz),
)),
Arc::new(NestedField::optional(
10,
"v_decimal",
Type::Primitive(PrimitiveType::Decimal {
precision: 36,
scale: 10,
}),
)),
Arc::new(NestedField::optional(
11,
"v_ts_ntz",
Type::Primitive(PrimitiveType::Timestamp),
)),
Arc::new(NestedField::optional(
12,
"v_ts_ns_ntz",
Type::Primitive(PrimitiveType::TimestampNs),
)),
])
.build()
.unwrap(),
);
let metadata = ManifestMetadata {
schema_id: 0,
schema: schema.clone(),
partition_spec: PartitionSpec::builder(schema)
.with_spec_id(0)
.add_partition_field("v_int", "v_int", Transform::Identity)
.unwrap()
.add_partition_field("v_long", "v_long", Transform::Identity)
.unwrap()
.build()
.unwrap(),
content: ManifestContentType::Data,
format_version: FormatVersion::V2,
};
let mut entries = vec![ManifestEntry {
status: ManifestStatus::Added,
snapshot_id: None,
sequence_number: None,
file_sequence_number: None,
data_file: DataFile {
content: DataContentType::Data,
file_format: DataFileFormat::Parquet,
file_path: "s3a://icebergdata/demo/s1/t1/data/00000-0-378b56f5-5c52-4102-a2c2-f05f8a7cbe4a-00000.parquet".to_string(),
partition: Struct::from_iter(
vec![
Some(Literal::int(1)),
Some(Literal::long(1000)),
]
.into_iter()
),
record_count: 1,
file_size_in_bytes: 5442,
column_sizes: HashMap::from([
(0, 73),
(6, 34),
(2, 73),
(7, 61),
(3, 61),
(5, 62),
(9, 79),
(10, 73),
(1, 61),
(4, 73),
(8, 73)
]),
value_counts: HashMap::from([
(4, 1),
(5, 1),
(2, 1),
(0, 1),
(3, 1),
(6, 1),
(8, 1),
(1, 1),
(10, 1),
(7, 1),
(9, 1)
]),
null_value_counts: HashMap::from([
(1, 0),
(6, 0),
(2, 0),
(8, 0),
(0, 0),
(3, 0),
(5, 0),
(9, 0),
(7, 0),
(4, 0),
(10, 0)
]),
nan_value_counts: HashMap::new(),
lower_bounds: HashMap::new(),
upper_bounds: HashMap::new(),
key_metadata: None,
split_offsets: vec![4],
equality_ids: vec![],
sort_order_id: None,
},
}];
let tmp_dir = TempDir::new().unwrap();
let path = tmp_dir.path().join("test_manifest.avro");
let io = FileIOBuilder::new_fs_io().build().unwrap();
let output_file = io.new_output(path.to_str().unwrap()).unwrap();
let mut writer = ManifestWriterBuilder::new(
output_file,
Some(2),
vec![],
metadata.schema.clone(),
metadata.partition_spec.clone(),
)
.build_v2_data();
for entry in &entries {
writer.add_entry(entry.clone()).unwrap();
}
let manifest_file = writer.write_manifest_file().await.unwrap();
assert_eq!(manifest_file.sequence_number, UNASSIGNED_SEQUENCE_NUMBER);
assert_eq!(
manifest_file.min_sequence_number,
UNASSIGNED_SEQUENCE_NUMBER
);
let actual_manifest =
Manifest::parse_avro(fs::read(path).expect("read_file must succeed").as_slice())
.unwrap();
entries[0].snapshot_id = Some(2);
assert_eq!(actual_manifest, Manifest::new(metadata, entries));
}
#[tokio::test]
async fn test_parse_manifest_v1_unpartition() {
let schema = Arc::new(
Schema::builder()
.with_schema_id(1)
.with_fields(vec![
Arc::new(NestedField::optional(
1,
"id",
Type::Primitive(PrimitiveType::Int),
)),
Arc::new(NestedField::optional(
2,
"data",
Type::Primitive(PrimitiveType::String),
)),
Arc::new(NestedField::optional(
3,
"comment",
Type::Primitive(PrimitiveType::String),
)),
])
.build()
.unwrap(),
);
let metadata = ManifestMetadata {
schema_id: 1,
schema: schema.clone(),
partition_spec: PartitionSpec::builder(schema)
.with_spec_id(0)
.build()
.unwrap(),
content: ManifestContentType::Data,
format_version: FormatVersion::V1,
};
let mut entries = vec![ManifestEntry {
status: ManifestStatus::Added,
snapshot_id: Some(0),
sequence_number: Some(0),
file_sequence_number: Some(0),
data_file: DataFile {
content: DataContentType::Data,
file_path: "s3://testbucket/iceberg_data/iceberg_ctl/iceberg_db/iceberg_tbl/data/00000-7-45268d71-54eb-476c-b42c-942d880c04a1-00001.parquet".to_string(),
file_format: DataFileFormat::Parquet,
partition: Struct::empty(),
record_count: 1,
file_size_in_bytes: 875,
column_sizes: HashMap::from([(1,47),(2,48),(3,52)]),
value_counts: HashMap::from([(1,1),(2,1),(3,1)]),
null_value_counts: HashMap::from([(1,0),(2,0),(3,0)]),
nan_value_counts: HashMap::new(),
lower_bounds: HashMap::from([(1,Datum::int(1)),(2,Datum::string("a")),(3,Datum::string("AC/DC"))]),
upper_bounds: HashMap::from([(1,Datum::int(1)),(2,Datum::string("a")),(3,Datum::string("AC/DC"))]),
key_metadata: None,
split_offsets: vec![4],
equality_ids: vec![],
sort_order_id: Some(0),
}
}];
let tmp_dir = TempDir::new().unwrap();
let path = tmp_dir.path().join("test_manifest.avro");
let io = FileIOBuilder::new_fs_io().build().unwrap();
let output_file = io.new_output(path.to_str().unwrap()).unwrap();
let mut writer = ManifestWriterBuilder::new(
output_file,
Some(3),
vec![],
metadata.schema.clone(),
metadata.partition_spec.clone(),
)
.build_v1();
for entry in &entries {
writer.add_entry(entry.clone()).unwrap();
}
writer.write_manifest_file().await.unwrap();
let actual_manifest =
Manifest::parse_avro(fs::read(path).expect("read_file must succeed").as_slice())
.unwrap();
entries[0].snapshot_id = Some(3);
assert_eq!(actual_manifest, Manifest::new(metadata, entries));
}
#[tokio::test]
async fn test_parse_manifest_v1_partition() {
let schema = Arc::new(
Schema::builder()
.with_fields(vec![
Arc::new(NestedField::optional(
1,
"id",
Type::Primitive(PrimitiveType::Long),
)),
Arc::new(NestedField::optional(
2,
"data",
Type::Primitive(PrimitiveType::String),
)),
Arc::new(NestedField::optional(
3,
"category",
Type::Primitive(PrimitiveType::String),
)),
])
.build()
.unwrap(),
);
let metadata = ManifestMetadata {
schema_id: 0,
schema: schema.clone(),
partition_spec: PartitionSpec::builder(schema)
.add_partition_field("category", "category", Transform::Identity)
.unwrap()
.build()
.unwrap(),
content: ManifestContentType::Data,
format_version: FormatVersion::V1,
};
let mut entries = vec![
ManifestEntry {
status: ManifestStatus::Added,
snapshot_id: Some(0),
sequence_number: Some(0),
file_sequence_number: Some(0),
data_file: DataFile {
content: DataContentType::Data,
file_path: "s3://testbucket/prod/db/sample/data/category=x/00010-1-d5c93668-1e52-41ac-92a6-bba590cbf249-00001.parquet".to_string(),
file_format: DataFileFormat::Parquet,
partition: Struct::from_iter(
vec![
Some(
Literal::string("x"),
),
]
.into_iter()
),
record_count: 1,
file_size_in_bytes: 874,
column_sizes: HashMap::from([(1, 46), (2, 48), (3, 48)]),
value_counts: HashMap::from([(1, 1), (2, 1), (3, 1)]),
null_value_counts: HashMap::from([(1, 0), (2, 0), (3, 0)]),
nan_value_counts: HashMap::new(),
lower_bounds: HashMap::from([
(1, Datum::long(1)),
(2, Datum::string("a")),
(3, Datum::string("x"))
]),
upper_bounds: HashMap::from([
(1, Datum::long(1)),
(2, Datum::string("a")),
(3, Datum::string("x"))
]),
key_metadata: None,
split_offsets: vec![4],
equality_ids: vec![],
sort_order_id: Some(0),
},
}
];
let tmp_dir = TempDir::new().unwrap();
let path = tmp_dir.path().join("test_manifest.avro");
let io = FileIOBuilder::new_fs_io().build().unwrap();
let output_file = io.new_output(path.to_str().unwrap()).unwrap();
let mut writer = ManifestWriterBuilder::new(
output_file,
Some(2),
vec![],
metadata.schema.clone(),
metadata.partition_spec.clone(),
)
.build_v1();
for entry in &entries {
writer.add_entry(entry.clone()).unwrap();
}
let manifest_file = writer.write_manifest_file().await.unwrap();
assert_eq!(manifest_file.partitions.len(), 1);
assert_eq!(
manifest_file.partitions[0].lower_bound,
Some(Datum::string("x"))
);
assert_eq!(
manifest_file.partitions[0].upper_bound,
Some(Datum::string("x"))
);
let actual_manifest =
Manifest::parse_avro(fs::read(path).expect("read_file must succeed").as_slice())
.unwrap();
entries[0].snapshot_id = Some(2);
assert_eq!(actual_manifest, Manifest::new(metadata, entries));
}
#[tokio::test]
async fn test_parse_manifest_with_schema_evolution() {
let schema = Arc::new(
Schema::builder()
.with_fields(vec![
Arc::new(NestedField::optional(
1,
"id",
Type::Primitive(PrimitiveType::Long),
)),
Arc::new(NestedField::optional(
2,
"v_int",
Type::Primitive(PrimitiveType::Int),
)),
])
.build()
.unwrap(),
);
let metadata = ManifestMetadata {
schema_id: 0,
schema: schema.clone(),
partition_spec: PartitionSpec::builder(schema)
.with_spec_id(0)
.build()
.unwrap(),
content: ManifestContentType::Data,
format_version: FormatVersion::V2,
};
let entries = vec![ManifestEntry {
status: ManifestStatus::Added,
snapshot_id: None,
sequence_number: None,
file_sequence_number: None,
data_file: DataFile {
content: DataContentType::Data,
file_format: DataFileFormat::Parquet,
file_path: "s3a://icebergdata/demo/s1/t1/data/00000-0-378b56f5-5c52-4102-a2c2-f05f8a7cbe4a-00000.parquet".to_string(),
partition: Struct::empty(),
record_count: 1,
file_size_in_bytes: 5442,
column_sizes: HashMap::from([
(1, 61),
(2, 73),
(3, 61),
]),
value_counts: HashMap::default(),
null_value_counts: HashMap::default(),
nan_value_counts: HashMap::new(),
lower_bounds: HashMap::from([
(1, Datum::long(1)),
(2, Datum::int(2)),
(3, Datum::string("x"))
]),
upper_bounds: HashMap::from([
(1, Datum::long(1)),
(2, Datum::int(2)),
(3, Datum::string("x"))
]),
key_metadata: None,
split_offsets: vec![4],
equality_ids: vec![],
sort_order_id: None,
},
}];
let tmp_dir = TempDir::new().unwrap();
let path = tmp_dir.path().join("test_manifest.avro");
let io = FileIOBuilder::new_fs_io().build().unwrap();
let output_file = io.new_output(path.to_str().unwrap()).unwrap();
let mut writer = ManifestWriterBuilder::new(
output_file,
Some(2),
vec![],
metadata.schema.clone(),
metadata.partition_spec.clone(),
)
.build_v2_data();
for entry in &entries {
writer.add_entry(entry.clone()).unwrap();
}
writer.write_manifest_file().await.unwrap();
let actual_manifest =
Manifest::parse_avro(fs::read(path).expect("read_file must succeed").as_slice())
.unwrap();
let schema = Arc::new(
Schema::builder()
.with_fields(vec![
Arc::new(NestedField::optional(
1,
"id",
Type::Primitive(PrimitiveType::Long),
)),
Arc::new(NestedField::optional(
2,
"v_int",
Type::Primitive(PrimitiveType::Int),
)),
])
.build()
.unwrap(),
);
let expected_manifest = Manifest {
metadata: ManifestMetadata {
schema_id: 0,
schema: schema.clone(),
partition_spec: PartitionSpec::builder(schema).with_spec_id(0).build().unwrap(),
content: ManifestContentType::Data,
format_version: FormatVersion::V2,
},
entries: vec![Arc::new(ManifestEntry {
status: ManifestStatus::Added,
snapshot_id: Some(2),
sequence_number: None,
file_sequence_number: None,
data_file: DataFile {
content: DataContentType::Data,
file_format: DataFileFormat::Parquet,
file_path: "s3a://icebergdata/demo/s1/t1/data/00000-0-378b56f5-5c52-4102-a2c2-f05f8a7cbe4a-00000.parquet".to_string(),
partition: Struct::empty(),
record_count: 1,
file_size_in_bytes: 5442,
column_sizes: HashMap::from([
(1, 61),
(2, 73),
(3, 61),
]),
value_counts: HashMap::default(),
null_value_counts: HashMap::default(),
nan_value_counts: HashMap::new(),
lower_bounds: HashMap::from([
(1, Datum::long(1)),
(2, Datum::int(2)),
]),
upper_bounds: HashMap::from([
(1, Datum::long(1)),
(2, Datum::int(2)),
]),
key_metadata: None,
split_offsets: vec![4],
equality_ids: vec![],
sort_order_id: None,
},
})],
};
assert_eq!(actual_manifest, expected_manifest);
}
#[tokio::test]
async fn test_manifest_summary() {
let schema = Arc::new(
Schema::builder()
.with_fields(vec![
Arc::new(NestedField::optional(
1,
"time",
Type::Primitive(PrimitiveType::Date),
)),
Arc::new(NestedField::optional(
2,
"v_float",
Type::Primitive(PrimitiveType::Float),
)),
Arc::new(NestedField::optional(
3,
"v_double",
Type::Primitive(PrimitiveType::Double),
)),
])
.build()
.unwrap(),
);
let partition_spec = PartitionSpec::builder(schema.clone())
.with_spec_id(0)
.add_partition_field("time", "year_of_time", Transform::Year)
.unwrap()
.add_partition_field("v_float", "f", Transform::Identity)
.unwrap()
.add_partition_field("v_double", "d", Transform::Identity)
.unwrap()
.build()
.unwrap();
let metadata = ManifestMetadata {
schema_id: 0,
schema,
partition_spec,
content: ManifestContentType::Data,
format_version: FormatVersion::V2,
};
let entries = vec![
ManifestEntry {
status: ManifestStatus::Added,
snapshot_id: None,
sequence_number: None,
file_sequence_number: None,
data_file: DataFile {
content: DataContentType::Data,
file_path: "s3a://icebergdata/demo/s1/t1/data/00000-0-ba56fbfa-f2ff-40c9-bb27-565ad6dc2be8-00000.parquet".to_string(),
file_format: DataFileFormat::Parquet,
partition: Struct::from_iter(
vec![
Some(Literal::int(2021)),
Some(Literal::float(1.0)),
Some(Literal::double(2.0)),
]
),
record_count: 1,
file_size_in_bytes: 5442,
column_sizes: HashMap::from([(0,73),(6,34),(2,73),(7,61),(3,61),(5,62),(9,79),(10,73),(1,61),(4,73),(8,73)]),
value_counts: HashMap::from([(4,1),(5,1),(2,1),(0,1),(3,1),(6,1),(8,1),(1,1),(10,1),(7,1),(9,1)]),
null_value_counts: HashMap::from([(1,0),(6,0),(2,0),(8,0),(0,0),(3,0),(5,0),(9,0),(7,0),(4,0),(10,0)]),
nan_value_counts: HashMap::new(),
lower_bounds: HashMap::new(),
upper_bounds: HashMap::new(),
key_metadata: None,
split_offsets: vec![4],
equality_ids: Vec::new(),
sort_order_id: None,
}
},
ManifestEntry {
status: ManifestStatus::Added,
snapshot_id: None,
sequence_number: None,
file_sequence_number: None,
data_file: DataFile {
content: DataContentType::Data,
file_path: "s3a://icebergdata/demo/s1/t1/data/00000-0-ba56fbfa-f2ff-40c9-bb27-565ad6dc2be8-00000.parquet".to_string(),
file_format: DataFileFormat::Parquet,
partition: Struct::from_iter(
vec![
Some(Literal::int(1111)),
Some(Literal::float(15.5)),
Some(Literal::double(25.5)),
]
),
record_count: 1,
file_size_in_bytes: 5442,
column_sizes: HashMap::from([(0,73),(6,34),(2,73),(7,61),(3,61),(5,62),(9,79),(10,73),(1,61),(4,73),(8,73)]),
value_counts: HashMap::from([(4,1),(5,1),(2,1),(0,1),(3,1),(6,1),(8,1),(1,1),(10,1),(7,1),(9,1)]),
null_value_counts: HashMap::from([(1,0),(6,0),(2,0),(8,0),(0,0),(3,0),(5,0),(9,0),(7,0),(4,0),(10,0)]),
nan_value_counts: HashMap::new(),
lower_bounds: HashMap::new(),
upper_bounds: HashMap::new(),
key_metadata: None,
split_offsets: vec![4],
equality_ids: Vec::new(),
sort_order_id: None,
}
},
ManifestEntry {
status: ManifestStatus::Added,
snapshot_id: None,
sequence_number: None,
file_sequence_number: None,
data_file: DataFile {
content: DataContentType::Data,
file_path: "s3a://icebergdata/demo/s1/t1/data/00000-0-ba56fbfa-f2ff-40c9-bb27-565ad6dc2be8-00000.parquet".to_string(),
file_format: DataFileFormat::Parquet,
partition: Struct::from_iter(
vec![
Some(Literal::int(1211)),
Some(Literal::float(f32::NAN)),
Some(Literal::double(1.0)),
]
),
record_count: 1,
file_size_in_bytes: 5442,
column_sizes: HashMap::from([(0,73),(6,34),(2,73),(7,61),(3,61),(5,62),(9,79),(10,73),(1,61),(4,73),(8,73)]),
value_counts: HashMap::from([(4,1),(5,1),(2,1),(0,1),(3,1),(6,1),(8,1),(1,1),(10,1),(7,1),(9,1)]),
null_value_counts: HashMap::from([(1,0),(6,0),(2,0),(8,0),(0,0),(3,0),(5,0),(9,0),(7,0),(4,0),(10,0)]),
nan_value_counts: HashMap::new(),
lower_bounds: HashMap::new(),
upper_bounds: HashMap::new(),
key_metadata: None,
split_offsets: vec![4],
equality_ids: Vec::new(),
sort_order_id: None,
}
},
ManifestEntry {
status: ManifestStatus::Added,
snapshot_id: None,
sequence_number: None,
file_sequence_number: None,
data_file: DataFile {
content: DataContentType::Data,
file_path: "s3a://icebergdata/demo/s1/t1/data/00000-0-ba56fbfa-f2ff-40c9-bb27-565ad6dc2be8-00000.parquet".to_string(),
file_format: DataFileFormat::Parquet,
partition: Struct::from_iter(
vec![
Some(Literal::int(1111)),
None,
Some(Literal::double(11.0)),
]
),
record_count: 1,
file_size_in_bytes: 5442,
column_sizes: HashMap::from([(0,73),(6,34),(2,73),(7,61),(3,61),(5,62),(9,79),(10,73),(1,61),(4,73),(8,73)]),
value_counts: HashMap::from([(4,1),(5,1),(2,1),(0,1),(3,1),(6,1),(8,1),(1,1),(10,1),(7,1),(9,1)]),
null_value_counts: HashMap::from([(1,0),(6,0),(2,0),(8,0),(0,0),(3,0),(5,0),(9,0),(7,0),(4,0),(10,0)]),
nan_value_counts: HashMap::new(),
lower_bounds: HashMap::new(),
upper_bounds: HashMap::new(),
key_metadata: None,
split_offsets: vec![4],
equality_ids: Vec::new(),
sort_order_id: None,
}
},
];
let tmp_dir = TempDir::new().unwrap();
let path = tmp_dir.path().join("test_manifest.avro");
let io = FileIOBuilder::new_fs_io().build().unwrap();
let output_file = io.new_output(path.to_str().unwrap()).unwrap();
let mut writer = ManifestWriterBuilder::new(
output_file,
Some(1),
vec![],
metadata.schema.clone(),
metadata.partition_spec.clone(),
)
.build_v2_data();
for entry in &entries {
writer.add_entry(entry.clone()).unwrap();
}
let res = writer.write_manifest_file().await.unwrap();
assert_eq!(res.partitions.len(), 3);
assert_eq!(res.partitions[0].lower_bound, Some(Datum::int(1111)));
assert_eq!(res.partitions[0].upper_bound, Some(Datum::int(2021)));
assert!(!res.partitions[0].contains_null);
assert_eq!(res.partitions[0].contains_nan, Some(false));
assert_eq!(res.partitions[1].lower_bound, Some(Datum::float(1.0)));
assert_eq!(res.partitions[1].upper_bound, Some(Datum::float(15.5)));
assert!(res.partitions[1].contains_null);
assert_eq!(res.partitions[1].contains_nan, Some(true));
assert_eq!(res.partitions[2].lower_bound, Some(Datum::double(1.0)));
assert_eq!(res.partitions[2].upper_bound, Some(Datum::double(25.5)));
assert!(!res.partitions[2].contains_null);
assert_eq!(res.partitions[2].contains_nan, Some(false));
}
#[tokio::test]
async fn test_add_delete_existing() {
let schema = Arc::new(
Schema::builder()
.with_fields(vec![
Arc::new(NestedField::optional(
1,
"id",
Type::Primitive(PrimitiveType::Int),
)),
Arc::new(NestedField::optional(
2,
"name",
Type::Primitive(PrimitiveType::String),
)),
])
.build()
.unwrap(),
);
let metadata = ManifestMetadata {
schema_id: 0,
schema: schema.clone(),
partition_spec: PartitionSpec::builder(schema)
.with_spec_id(0)
.build()
.unwrap(),
content: ManifestContentType::Data,
format_version: FormatVersion::V2,
};
let mut entries = vec![
ManifestEntry {
status: ManifestStatus::Added,
snapshot_id: None,
sequence_number: Some(1),
file_sequence_number: Some(1),
data_file: DataFile {
content: DataContentType::Data,
file_path: "s3a://icebergdata/demo/s1/t1/data/00000-0-ba56fbfa-f2ff-40c9-bb27-565ad6dc2be8-00000.parquet".to_string(),
file_format: DataFileFormat::Parquet,
partition: Struct::empty(),
record_count: 1,
file_size_in_bytes: 5442,
column_sizes: HashMap::from([(1, 61), (2, 73)]),
value_counts: HashMap::from([(1, 1), (2, 1)]),
null_value_counts: HashMap::from([(1, 0), (2, 0)]),
nan_value_counts: HashMap::new(),
lower_bounds: HashMap::new(),
upper_bounds: HashMap::new(),
key_metadata: Some(Vec::new()),
split_offsets: vec![4],
equality_ids: Vec::new(),
sort_order_id: None,
},
},
ManifestEntry {
status: ManifestStatus::Deleted,
snapshot_id: Some(1),
sequence_number: Some(1),
file_sequence_number: Some(1),
data_file: DataFile {
content: DataContentType::Data,
file_path: "s3a://icebergdata/demo/s1/t1/data/00000-0-ba56fbfa-f2ff-40c9-bb27-565ad6dc2be8-00000.parquet".to_string(),
file_format: DataFileFormat::Parquet,
partition: Struct::empty(),
record_count: 1,
file_size_in_bytes: 5442,
column_sizes: HashMap::from([(1, 61), (2, 73)]),
value_counts: HashMap::from([(1, 1), (2, 1)]),
null_value_counts: HashMap::from([(1, 0), (2, 0)]),
nan_value_counts: HashMap::new(),
lower_bounds: HashMap::new(),
upper_bounds: HashMap::new(),
key_metadata: Some(Vec::new()),
split_offsets: vec![4],
equality_ids: Vec::new(),
sort_order_id: None,
},
},
ManifestEntry {
status: ManifestStatus::Existing,
snapshot_id: Some(1),
sequence_number: Some(1),
file_sequence_number: Some(1),
data_file: DataFile {
content: DataContentType::Data,
file_path: "s3a://icebergdata/demo/s1/t1/data/00000-0-ba56fbfa-f2ff-40c9-bb27-565ad6dc2be8-00000.parquet".to_string(),
file_format: DataFileFormat::Parquet,
partition: Struct::empty(),
record_count: 1,
file_size_in_bytes: 5442,
column_sizes: HashMap::from([(1, 61), (2, 73)]),
value_counts: HashMap::from([(1, 1), (2, 1)]),
null_value_counts: HashMap::from([(1, 0), (2, 0)]),
nan_value_counts: HashMap::new(),
lower_bounds: HashMap::new(),
upper_bounds: HashMap::new(),
key_metadata: Some(Vec::new()),
split_offsets: vec![4],
equality_ids: Vec::new(),
sort_order_id: None,
},
},
];
let tmp_dir = TempDir::new().unwrap();
let path = tmp_dir.path().join("test_manifest.avro");
let io = FileIOBuilder::new_fs_io().build().unwrap();
let output_file = io.new_output(path.to_str().unwrap()).unwrap();
let mut writer = ManifestWriterBuilder::new(
output_file,
Some(3),
vec![],
metadata.schema.clone(),
metadata.partition_spec.clone(),
)
.build_v2_data();
writer.add_entry(entries[0].clone()).unwrap();
writer.add_delete_entry(entries[1].clone()).unwrap();
writer.add_existing_entry(entries[2].clone()).unwrap();
writer.write_manifest_file().await.unwrap();
let actual_manifest =
Manifest::parse_avro(fs::read(path).expect("read_file must succeed").as_slice())
.unwrap();
entries[0].snapshot_id = Some(3);
entries[1].snapshot_id = Some(3);
entries[0].file_sequence_number = None;
assert_eq!(actual_manifest, Manifest::new(metadata, entries));
}
#[tokio::test]
async fn test_data_file_serialize_deserialize() {
let schema = Arc::new(
Schema::builder()
.with_fields(vec![
Arc::new(NestedField::optional(
1,
"v1",
Type::Primitive(PrimitiveType::Int),
)),
Arc::new(NestedField::optional(
2,
"v2",
Type::Primitive(PrimitiveType::String),
)),
Arc::new(NestedField::optional(
3,
"v3",
Type::Primitive(PrimitiveType::String),
)),
])
.build()
.unwrap(),
);
let data_files = vec![DataFile {
content: DataContentType::Data,
file_path: "s3://testbucket/iceberg_data/iceberg_ctl/iceberg_db/iceberg_tbl/data/00000-7-45268d71-54eb-476c-b42c-942d880c04a1-00001.parquet".to_string(),
file_format: DataFileFormat::Parquet,
partition: Struct::empty(),
record_count: 1,
file_size_in_bytes: 875,
column_sizes: HashMap::from([(1,47),(2,48),(3,52)]),
value_counts: HashMap::from([(1,1),(2,1),(3,1)]),
null_value_counts: HashMap::from([(1,0),(2,0),(3,0)]),
nan_value_counts: HashMap::new(),
lower_bounds: HashMap::from([(1,Datum::int(1)),(2,Datum::string("a")),(3,Datum::string("AC/DC"))]),
upper_bounds: HashMap::from([(1,Datum::int(1)),(2,Datum::string("a")),(3,Datum::string("AC/DC"))]),
key_metadata: None,
split_offsets: vec![4],
equality_ids: vec![],
sort_order_id: Some(0),
}];
let mut buffer = Vec::new();
let _ = write_data_files_to_avro(
&mut buffer,
data_files.clone().into_iter(),
&StructType::new(vec![]),
FormatVersion::V2,
)
.unwrap();
let actual_data_file = read_data_files_from_avro(
&mut Cursor::new(buffer),
&schema,
&StructType::new(vec![]),
FormatVersion::V2,
)
.unwrap();
assert_eq!(data_files, actual_data_file);
}
}