use std::cmp::Ordering;
use std::collections::HashMap;
use std::future::Future;
use std::mem::discriminant;
use std::ops::RangeFrom;
use uuid::Uuid;
use crate::error::Result;
use crate::io::OutputFile;
use crate::spec::{
DataFile, DataFileFormat, FormatVersion, ManifestEntry, ManifestFile, ManifestListWriter,
ManifestWriterBuilder, NullOrder, Operation, Snapshot, SnapshotReference, SnapshotRetention,
SortDirection, SortField, SortOrder, Struct, StructType, Summary, Transform, MAIN_BRANCH,
};
use crate::table::Table;
use crate::TableUpdate::UpgradeFormatVersion;
use crate::{Catalog, Error, ErrorKind, TableCommit, TableRequirement, TableUpdate};
const META_ROOT_PATH: &str = "metadata";
pub struct Transaction<'a> {
table: &'a Table,
updates: Vec<TableUpdate>,
requirements: Vec<TableRequirement>,
}
impl<'a> Transaction<'a> {
pub fn new(table: &'a Table) -> Self {
Self {
table,
updates: vec![],
requirements: vec![],
}
}
fn append_updates(&mut self, updates: Vec<TableUpdate>) -> Result<()> {
for update in &updates {
for up in &self.updates {
if discriminant(up) == discriminant(update) {
return Err(Error::new(
ErrorKind::DataInvalid,
format!(
"Cannot apply update with same type at same time: {:?}",
update
),
));
}
}
}
self.updates.extend(updates);
Ok(())
}
fn append_requirements(&mut self, requirements: Vec<TableRequirement>) -> Result<()> {
self.requirements.extend(requirements);
Ok(())
}
pub fn upgrade_table_version(mut self, format_version: FormatVersion) -> Result<Self> {
let current_version = self.table.metadata().format_version();
match current_version.cmp(&format_version) {
Ordering::Greater => {
return Err(Error::new(
ErrorKind::DataInvalid,
format!(
"Cannot downgrade table version from {} to {}",
current_version, format_version
),
));
}
Ordering::Less => {
self.append_updates(vec![UpgradeFormatVersion { format_version }])?;
}
Ordering::Equal => {
}
}
Ok(self)
}
pub fn set_properties(mut self, props: HashMap<String, String>) -> Result<Self> {
self.append_updates(vec![TableUpdate::SetProperties { updates: props }])?;
Ok(self)
}
fn generate_unique_snapshot_id(&self) -> i64 {
let generate_random_id = || -> i64 {
let (lhs, rhs) = Uuid::new_v4().as_u64_pair();
let snapshot_id = (lhs ^ rhs) as i64;
if snapshot_id < 0 {
-snapshot_id
} else {
snapshot_id
}
};
let mut snapshot_id = generate_random_id();
while self
.table
.metadata()
.snapshots()
.any(|s| s.snapshot_id() == snapshot_id)
{
snapshot_id = generate_random_id();
}
snapshot_id
}
pub fn fast_append(
self,
commit_uuid: Option<Uuid>,
key_metadata: Vec<u8>,
) -> Result<FastAppendAction<'a>> {
let snapshot_id = self.generate_unique_snapshot_id();
FastAppendAction::new(
self,
snapshot_id,
commit_uuid.unwrap_or_else(Uuid::now_v7),
key_metadata,
HashMap::new(),
)
}
pub fn replace_sort_order(self) -> ReplaceSortOrderAction<'a> {
ReplaceSortOrderAction {
tx: self,
sort_fields: vec![],
}
}
pub fn remove_properties(mut self, keys: Vec<String>) -> Result<Self> {
self.append_updates(vec![TableUpdate::RemoveProperties { removals: keys }])?;
Ok(self)
}
pub async fn commit(self, catalog: &impl Catalog) -> Result<Table> {
let table_commit = TableCommit::builder()
.ident(self.table.identifier().clone())
.updates(self.updates)
.requirements(self.requirements)
.build();
catalog.update_table(table_commit).await
}
}
pub struct FastAppendAction<'a> {
snapshot_produce_action: SnapshotProduceAction<'a>,
}
impl<'a> FastAppendAction<'a> {
#[allow(clippy::too_many_arguments)]
pub(crate) fn new(
tx: Transaction<'a>,
snapshot_id: i64,
commit_uuid: Uuid,
key_metadata: Vec<u8>,
snapshot_properties: HashMap<String, String>,
) -> Result<Self> {
Ok(Self {
snapshot_produce_action: SnapshotProduceAction::new(
tx,
snapshot_id,
key_metadata,
commit_uuid,
snapshot_properties,
)?,
})
}
pub fn add_data_files(
&mut self,
data_files: impl IntoIterator<Item = DataFile>,
) -> Result<&mut Self> {
self.snapshot_produce_action.add_data_files(data_files)?;
Ok(self)
}
pub async fn apply(self) -> Result<Transaction<'a>> {
self.snapshot_produce_action
.apply(FastAppendOperation, DefaultManifestProcess)
.await
}
}
struct FastAppendOperation;
impl SnapshotProduceOperation for FastAppendOperation {
fn operation(&self) -> Operation {
Operation::Append
}
async fn delete_entries(
&self,
_snapshot_produce: &SnapshotProduceAction<'_>,
) -> Result<Vec<ManifestEntry>> {
Ok(vec![])
}
async fn existing_manifest(
&self,
snapshot_produce: &SnapshotProduceAction<'_>,
) -> Result<Vec<ManifestFile>> {
let Some(snapshot) = snapshot_produce.tx.table.metadata().current_snapshot() else {
return Ok(vec![]);
};
let manifest_list = snapshot
.load_manifest_list(
snapshot_produce.tx.table.file_io(),
&snapshot_produce.tx.table.metadata_ref(),
)
.await?;
Ok(manifest_list
.entries()
.iter()
.filter(|entry| entry.has_added_files() || entry.has_existing_files())
.cloned()
.collect())
}
}
trait SnapshotProduceOperation: Send + Sync {
fn operation(&self) -> Operation;
#[allow(unused)]
fn delete_entries(
&self,
snapshot_produce: &SnapshotProduceAction,
) -> impl Future<Output = Result<Vec<ManifestEntry>>> + Send;
fn existing_manifest(
&self,
snapshot_produce: &SnapshotProduceAction,
) -> impl Future<Output = Result<Vec<ManifestFile>>> + Send;
}
struct DefaultManifestProcess;
impl ManifestProcess for DefaultManifestProcess {
fn process_manifeset(&self, manifests: Vec<ManifestFile>) -> Vec<ManifestFile> {
manifests
}
}
trait ManifestProcess: Send + Sync {
fn process_manifeset(&self, manifests: Vec<ManifestFile>) -> Vec<ManifestFile>;
}
struct SnapshotProduceAction<'a> {
tx: Transaction<'a>,
snapshot_id: i64,
key_metadata: Vec<u8>,
commit_uuid: Uuid,
snapshot_properties: HashMap<String, String>,
added_data_files: Vec<DataFile>,
manifest_counter: RangeFrom<u64>,
}
impl<'a> SnapshotProduceAction<'a> {
pub(crate) fn new(
tx: Transaction<'a>,
snapshot_id: i64,
key_metadata: Vec<u8>,
commit_uuid: Uuid,
snapshot_properties: HashMap<String, String>,
) -> Result<Self> {
Ok(Self {
tx,
snapshot_id,
commit_uuid,
snapshot_properties,
added_data_files: vec![],
manifest_counter: (0..),
key_metadata,
})
}
fn validate_partition_value(
partition_value: &Struct,
partition_type: &StructType,
) -> Result<()> {
if partition_value.fields().len() != partition_type.fields().len() {
return Err(Error::new(
ErrorKind::DataInvalid,
"Partition value is not compatible with partition type",
));
}
for (value, field) in partition_value.fields().iter().zip(partition_type.fields()) {
if !field
.field_type
.as_primitive_type()
.ok_or_else(|| {
Error::new(
ErrorKind::Unexpected,
"Partition field should only be primitive type.",
)
})?
.compatible(&value.as_primitive_literal().unwrap())
{
return Err(Error::new(
ErrorKind::DataInvalid,
"Partition value is not compatible partition type",
));
}
}
Ok(())
}
pub fn add_data_files(
&mut self,
data_files: impl IntoIterator<Item = DataFile>,
) -> Result<&mut Self> {
let data_files: Vec<DataFile> = data_files.into_iter().collect();
for data_file in &data_files {
if data_file.content_type() != crate::spec::DataContentType::Data {
return Err(Error::new(
ErrorKind::DataInvalid,
"Only data content type is allowed for fast append",
));
}
Self::validate_partition_value(
data_file.partition(),
self.tx.table.metadata().default_partition_type(),
)?;
}
self.added_data_files.extend(data_files);
Ok(self)
}
fn new_manifest_output(&mut self) -> Result<OutputFile> {
let new_manifest_path = format!(
"{}/{}/{}-m{}.{}",
self.tx.table.metadata().location(),
META_ROOT_PATH,
self.commit_uuid,
self.manifest_counter.next().unwrap(),
DataFileFormat::Avro
);
self.tx.table.file_io().new_output(new_manifest_path)
}
async fn write_added_manifest(&mut self) -> Result<ManifestFile> {
let added_data_files = std::mem::take(&mut self.added_data_files);
let snapshot_id = self.snapshot_id;
let manifest_entries = added_data_files.into_iter().map(|data_file| {
let builder = ManifestEntry::builder()
.status(crate::spec::ManifestStatus::Added)
.data_file(data_file);
if self.tx.table.metadata().format_version() == FormatVersion::V1 {
builder.snapshot_id(snapshot_id).build()
} else {
builder.build()
}
});
let mut writer = {
let builder = ManifestWriterBuilder::new(
self.new_manifest_output()?,
Some(self.snapshot_id),
self.key_metadata.clone(),
self.tx.table.metadata().current_schema().clone(),
self.tx
.table
.metadata()
.default_partition_spec()
.as_ref()
.clone(),
);
if self.tx.table.metadata().format_version() == FormatVersion::V1 {
builder.build_v1()
} else {
builder.build_v2_data()
}
};
for entry in manifest_entries {
writer.add_entry(entry)?;
}
writer.write_manifest_file().await
}
async fn manifest_file<OP: SnapshotProduceOperation, MP: ManifestProcess>(
&mut self,
snapshot_produce_operation: &OP,
manifest_process: &MP,
) -> Result<Vec<ManifestFile>> {
let added_manifest = self.write_added_manifest().await?;
let existing_manifests = snapshot_produce_operation.existing_manifest(self).await?;
let mut manifest_files = vec![added_manifest];
manifest_files.extend(existing_manifests);
let manifest_files = manifest_process.process_manifeset(manifest_files);
Ok(manifest_files)
}
fn summary<OP: SnapshotProduceOperation>(&self, snapshot_produce_operation: &OP) -> Summary {
Summary {
operation: snapshot_produce_operation.operation(),
additional_properties: self.snapshot_properties.clone(),
}
}
fn generate_manifest_list_file_path(&self, attempt: i64) -> String {
format!(
"{}/{}/snap-{}-{}-{}.{}",
self.tx.table.metadata().location(),
META_ROOT_PATH,
self.snapshot_id,
attempt,
self.commit_uuid,
DataFileFormat::Avro
)
}
pub async fn apply<OP: SnapshotProduceOperation, MP: ManifestProcess>(
mut self,
snapshot_produce_operation: OP,
process: MP,
) -> Result<Transaction<'a>> {
let new_manifests = self
.manifest_file(&snapshot_produce_operation, &process)
.await?;
let next_seq_num = self.tx.table.metadata().next_sequence_number();
let summary = self.summary(&snapshot_produce_operation);
let manifest_list_path = self.generate_manifest_list_file_path(0);
let mut manifest_list_writer = match self.tx.table.metadata().format_version() {
FormatVersion::V1 => ManifestListWriter::v1(
self.tx
.table
.file_io()
.new_output(manifest_list_path.clone())?,
self.snapshot_id,
self.tx.table.metadata().current_snapshot_id(),
),
FormatVersion::V2 => ManifestListWriter::v2(
self.tx
.table
.file_io()
.new_output(manifest_list_path.clone())?,
self.snapshot_id,
self.tx.table.metadata().current_snapshot_id(),
next_seq_num,
),
};
manifest_list_writer.add_manifests(new_manifests.into_iter())?;
manifest_list_writer.close().await?;
let commit_ts = chrono::Utc::now().timestamp_millis();
let new_snapshot = Snapshot::builder()
.with_manifest_list(manifest_list_path)
.with_snapshot_id(self.snapshot_id)
.with_parent_snapshot_id(self.tx.table.metadata().current_snapshot_id())
.with_sequence_number(next_seq_num)
.with_summary(summary)
.with_schema_id(self.tx.table.metadata().current_schema_id())
.with_timestamp_ms(commit_ts)
.build();
self.tx.append_updates(vec![
TableUpdate::AddSnapshot {
snapshot: new_snapshot,
},
TableUpdate::SetSnapshotRef {
ref_name: MAIN_BRANCH.to_string(),
reference: SnapshotReference::new(
self.snapshot_id,
SnapshotRetention::branch(None, None, None),
),
},
])?;
self.tx.append_requirements(vec![
TableRequirement::UuidMatch {
uuid: self.tx.table.metadata().uuid(),
},
TableRequirement::RefSnapshotIdMatch {
r#ref: MAIN_BRANCH.to_string(),
snapshot_id: self.tx.table.metadata().current_snapshot_id(),
},
])?;
Ok(self.tx)
}
}
pub struct ReplaceSortOrderAction<'a> {
tx: Transaction<'a>,
sort_fields: Vec<SortField>,
}
impl<'a> ReplaceSortOrderAction<'a> {
pub fn asc(self, name: &str, null_order: NullOrder) -> Result<Self> {
self.add_sort_field(name, SortDirection::Ascending, null_order)
}
pub fn desc(self, name: &str, null_order: NullOrder) -> Result<Self> {
self.add_sort_field(name, SortDirection::Descending, null_order)
}
pub fn apply(mut self) -> Result<Transaction<'a>> {
let unbound_sort_order = SortOrder::builder()
.with_fields(self.sort_fields)
.build_unbound()?;
let updates = vec![
TableUpdate::AddSortOrder {
sort_order: unbound_sort_order,
},
TableUpdate::SetDefaultSortOrder { sort_order_id: -1 },
];
let requirements = vec![
TableRequirement::CurrentSchemaIdMatch {
current_schema_id: self.tx.table.metadata().current_schema().schema_id(),
},
TableRequirement::DefaultSortOrderIdMatch {
default_sort_order_id: self.tx.table.metadata().default_sort_order().order_id,
},
];
self.tx.append_requirements(requirements)?;
self.tx.append_updates(updates)?;
Ok(self.tx)
}
fn add_sort_field(
mut self,
name: &str,
sort_direction: SortDirection,
null_order: NullOrder,
) -> Result<Self> {
let field_id = self
.tx
.table
.metadata()
.current_schema()
.field_id_by_name(name)
.ok_or_else(|| {
Error::new(
ErrorKind::DataInvalid,
format!("Cannot find field {} in table schema", name),
)
})?;
let sort_field = SortField::builder()
.source_id(field_id)
.transform(Transform::Identity)
.direction(sort_direction)
.null_order(null_order)
.build();
self.sort_fields.push(sort_field);
Ok(self)
}
}
#[cfg(test)]
mod tests {
use std::collections::HashMap;
use std::fs::File;
use std::io::BufReader;
use crate::io::FileIOBuilder;
use crate::spec::{
DataContentType, DataFileBuilder, DataFileFormat, FormatVersion, Literal, Struct,
TableMetadata,
};
use crate::table::Table;
use crate::transaction::{Transaction, MAIN_BRANCH};
use crate::{TableIdent, TableRequirement, TableUpdate};
fn make_v1_table() -> Table {
let file = File::open(format!(
"{}/testdata/table_metadata/{}",
env!("CARGO_MANIFEST_DIR"),
"TableMetadataV1Valid.json"
))
.unwrap();
let reader = BufReader::new(file);
let resp = serde_json::from_reader::<_, TableMetadata>(reader).unwrap();
Table::builder()
.metadata(resp)
.metadata_location("s3://bucket/test/location/metadata/v1.json".to_string())
.identifier(TableIdent::from_strs(["ns1", "test1"]).unwrap())
.file_io(FileIOBuilder::new("memory").build().unwrap())
.build()
.unwrap()
}
fn make_v2_table() -> Table {
let file = File::open(format!(
"{}/testdata/table_metadata/{}",
env!("CARGO_MANIFEST_DIR"),
"TableMetadataV2Valid.json"
))
.unwrap();
let reader = BufReader::new(file);
let resp = serde_json::from_reader::<_, TableMetadata>(reader).unwrap();
Table::builder()
.metadata(resp)
.metadata_location("s3://bucket/test/location/metadata/v1.json".to_string())
.identifier(TableIdent::from_strs(["ns1", "test1"]).unwrap())
.file_io(FileIOBuilder::new("memory").build().unwrap())
.build()
.unwrap()
}
fn make_v2_minimal_table() -> Table {
let file = File::open(format!(
"{}/testdata/table_metadata/{}",
env!("CARGO_MANIFEST_DIR"),
"TableMetadataV2ValidMinimal.json"
))
.unwrap();
let reader = BufReader::new(file);
let resp = serde_json::from_reader::<_, TableMetadata>(reader).unwrap();
Table::builder()
.metadata(resp)
.metadata_location("s3://bucket/test/location/metadata/v1.json".to_string())
.identifier(TableIdent::from_strs(["ns1", "test1"]).unwrap())
.file_io(FileIOBuilder::new("memory").build().unwrap())
.build()
.unwrap()
}
#[test]
fn test_upgrade_table_version_v1_to_v2() {
let table = make_v1_table();
let tx = Transaction::new(&table);
let tx = tx.upgrade_table_version(FormatVersion::V2).unwrap();
assert_eq!(
vec![TableUpdate::UpgradeFormatVersion {
format_version: FormatVersion::V2
}],
tx.updates
);
}
#[test]
fn test_upgrade_table_version_v2_to_v2() {
let table = make_v2_table();
let tx = Transaction::new(&table);
let tx = tx.upgrade_table_version(FormatVersion::V2).unwrap();
assert!(
tx.updates.is_empty(),
"Upgrade table to same version should not generate any updates"
);
assert!(
tx.requirements.is_empty(),
"Upgrade table to same version should not generate any requirements"
);
}
#[test]
fn test_downgrade_table_version() {
let table = make_v2_table();
let tx = Transaction::new(&table);
let tx = tx.upgrade_table_version(FormatVersion::V1);
assert!(tx.is_err(), "Downgrade table version should fail!");
}
#[test]
fn test_set_table_property() {
let table = make_v2_table();
let tx = Transaction::new(&table);
let tx = tx
.set_properties(HashMap::from([("a".to_string(), "b".to_string())]))
.unwrap();
assert_eq!(
vec![TableUpdate::SetProperties {
updates: HashMap::from([("a".to_string(), "b".to_string())])
}],
tx.updates
);
}
#[test]
fn test_remove_property() {
let table = make_v2_table();
let tx = Transaction::new(&table);
let tx = tx
.remove_properties(vec!["a".to_string(), "b".to_string()])
.unwrap();
assert_eq!(
vec![TableUpdate::RemoveProperties {
removals: vec!["a".to_string(), "b".to_string()]
}],
tx.updates
);
}
#[test]
fn test_replace_sort_order() {
let table = make_v2_table();
let tx = Transaction::new(&table);
let tx = tx.replace_sort_order().apply().unwrap();
assert_eq!(
vec![
TableUpdate::AddSortOrder {
sort_order: Default::default()
},
TableUpdate::SetDefaultSortOrder { sort_order_id: -1 }
],
tx.updates
);
assert_eq!(
vec![
TableRequirement::CurrentSchemaIdMatch {
current_schema_id: 1
},
TableRequirement::DefaultSortOrderIdMatch {
default_sort_order_id: 3
}
],
tx.requirements
);
}
#[tokio::test]
async fn test_fast_append_action() {
let table = make_v2_minimal_table();
let tx = Transaction::new(&table);
let mut action = tx.fast_append(None, vec![]).unwrap();
let data_file = DataFileBuilder::default()
.content(DataContentType::Data)
.file_path("test/3.parquet".to_string())
.file_format(DataFileFormat::Parquet)
.file_size_in_bytes(100)
.record_count(1)
.partition(Struct::from_iter([Some(Literal::string("test"))]))
.build()
.unwrap();
assert!(action.add_data_files(vec![data_file.clone()]).is_err());
let data_file = DataFileBuilder::default()
.content(DataContentType::Data)
.file_path("test/3.parquet".to_string())
.file_format(DataFileFormat::Parquet)
.file_size_in_bytes(100)
.record_count(1)
.partition(Struct::from_iter([Some(Literal::long(300))]))
.build()
.unwrap();
action.add_data_files(vec![data_file.clone()]).unwrap();
let tx = action.apply().await.unwrap();
assert!(
matches!((&tx.updates[0],&tx.updates[1]), (TableUpdate::AddSnapshot { snapshot },TableUpdate::SetSnapshotRef { reference,ref_name }) if snapshot.snapshot_id() == reference.snapshot_id && ref_name == MAIN_BRANCH)
);
assert_eq!(
vec![
TableRequirement::UuidMatch {
uuid: tx.table.metadata().uuid()
},
TableRequirement::RefSnapshotIdMatch {
r#ref: MAIN_BRANCH.to_string(),
snapshot_id: tx.table.metadata().current_snapshot_id
}
],
tx.requirements
);
let new_snapshot = if let TableUpdate::AddSnapshot { snapshot } = &tx.updates[0] {
snapshot
} else {
unreachable!()
};
let manifest_list = new_snapshot
.load_manifest_list(table.file_io(), table.metadata())
.await
.unwrap();
assert_eq!(1, manifest_list.entries().len());
assert_eq!(
manifest_list.entries()[0].sequence_number,
new_snapshot.sequence_number()
);
let manifest = manifest_list.entries()[0]
.load_manifest(table.file_io())
.await
.unwrap();
assert_eq!(1, manifest.entries().len());
assert_eq!(
new_snapshot.sequence_number(),
manifest.entries()[0]
.sequence_number()
.expect("Inherit sequence number by load manifest")
);
assert_eq!(
new_snapshot.snapshot_id(),
manifest.entries()[0].snapshot_id().unwrap()
);
assert_eq!(data_file, *manifest.entries()[0].data_file());
}
#[test]
fn test_do_same_update_in_same_transaction() {
let table = make_v2_table();
let tx = Transaction::new(&table);
let tx = tx
.remove_properties(vec!["a".to_string(), "b".to_string()])
.unwrap();
let tx = tx.remove_properties(vec!["c".to_string(), "d".to_string()]);
assert!(
tx.is_err(),
"Should not allow to do same kinds update in same transaction"
);
}
}