mod append;
mod snapshot;
mod sort_order;
use std::cmp::Ordering;
use std::collections::HashMap;
use std::mem::discriminant;
use uuid::Uuid;
use crate::error::Result;
use crate::spec::FormatVersion;
use crate::table::Table;
use crate::transaction::append::FastAppendAction;
use crate::transaction::sort_order::ReplaceSortOrderAction;
use crate::TableUpdate::UpgradeFormatVersion;
use crate::{Catalog, Error, ErrorKind, TableCommit, TableRequirement, TableUpdate};
pub struct Transaction<'a> {
table: &'a Table,
updates: Vec<TableUpdate>,
requirements: Vec<TableRequirement>,
}
impl<'a> Transaction<'a> {
pub fn new(table: &'a Table) -> Self {
Self {
table,
updates: vec![],
requirements: vec![],
}
}
fn append_updates(&mut self, updates: Vec<TableUpdate>) -> Result<()> {
for update in &updates {
for up in &self.updates {
if discriminant(up) == discriminant(update) {
return Err(Error::new(
ErrorKind::DataInvalid,
format!(
"Cannot apply update with same type at same time: {:?}",
update
),
));
}
}
}
self.updates.extend(updates);
Ok(())
}
fn append_requirements(&mut self, requirements: Vec<TableRequirement>) -> Result<()> {
self.requirements.extend(requirements);
Ok(())
}
pub fn upgrade_table_version(mut self, format_version: FormatVersion) -> Result<Self> {
let current_version = self.table.metadata().format_version();
match current_version.cmp(&format_version) {
Ordering::Greater => {
return Err(Error::new(
ErrorKind::DataInvalid,
format!(
"Cannot downgrade table version from {} to {}",
current_version, format_version
),
));
}
Ordering::Less => {
self.append_updates(vec![UpgradeFormatVersion { format_version }])?;
}
Ordering::Equal => {
}
}
Ok(self)
}
pub fn set_properties(mut self, props: HashMap<String, String>) -> Result<Self> {
self.append_updates(vec![TableUpdate::SetProperties { updates: props }])?;
Ok(self)
}
fn generate_unique_snapshot_id(&self) -> i64 {
let generate_random_id = || -> i64 {
let (lhs, rhs) = Uuid::new_v4().as_u64_pair();
let snapshot_id = (lhs ^ rhs) as i64;
if snapshot_id < 0 {
-snapshot_id
} else {
snapshot_id
}
};
let mut snapshot_id = generate_random_id();
while self
.table
.metadata()
.snapshots()
.any(|s| s.snapshot_id() == snapshot_id)
{
snapshot_id = generate_random_id();
}
snapshot_id
}
pub fn fast_append(
self,
commit_uuid: Option<Uuid>,
key_metadata: Vec<u8>,
) -> Result<FastAppendAction<'a>> {
let snapshot_id = self.generate_unique_snapshot_id();
FastAppendAction::new(
self,
snapshot_id,
commit_uuid.unwrap_or_else(Uuid::now_v7),
key_metadata,
HashMap::new(),
)
}
pub fn replace_sort_order(self) -> ReplaceSortOrderAction<'a> {
ReplaceSortOrderAction {
tx: self,
sort_fields: vec![],
}
}
pub fn remove_properties(mut self, keys: Vec<String>) -> Result<Self> {
self.append_updates(vec![TableUpdate::RemoveProperties { removals: keys }])?;
Ok(self)
}
pub async fn commit(self, catalog: &dyn Catalog) -> Result<Table> {
let table_commit = TableCommit::builder()
.ident(self.table.identifier().clone())
.updates(self.updates)
.requirements(self.requirements)
.build();
catalog.update_table(table_commit).await
}
}
#[cfg(test)]
mod tests {
use std::collections::HashMap;
use std::fs::File;
use std::io::BufReader;
use crate::io::FileIOBuilder;
use crate::spec::{FormatVersion, TableMetadata};
use crate::table::Table;
use crate::transaction::Transaction;
use crate::{TableIdent, TableUpdate};
fn make_v1_table() -> Table {
let file = File::open(format!(
"{}/testdata/table_metadata/{}",
env!("CARGO_MANIFEST_DIR"),
"TableMetadataV1Valid.json"
))
.unwrap();
let reader = BufReader::new(file);
let resp = serde_json::from_reader::<_, TableMetadata>(reader).unwrap();
Table::builder()
.metadata(resp)
.metadata_location("s3://bucket/test/location/metadata/v1.json".to_string())
.identifier(TableIdent::from_strs(["ns1", "test1"]).unwrap())
.file_io(FileIOBuilder::new("memory").build().unwrap())
.build()
.unwrap()
}
pub fn make_v2_table() -> Table {
let file = File::open(format!(
"{}/testdata/table_metadata/{}",
env!("CARGO_MANIFEST_DIR"),
"TableMetadataV2Valid.json"
))
.unwrap();
let reader = BufReader::new(file);
let resp = serde_json::from_reader::<_, TableMetadata>(reader).unwrap();
Table::builder()
.metadata(resp)
.metadata_location("s3://bucket/test/location/metadata/v1.json".to_string())
.identifier(TableIdent::from_strs(["ns1", "test1"]).unwrap())
.file_io(FileIOBuilder::new("memory").build().unwrap())
.build()
.unwrap()
}
pub fn make_v2_minimal_table() -> Table {
let file = File::open(format!(
"{}/testdata/table_metadata/{}",
env!("CARGO_MANIFEST_DIR"),
"TableMetadataV2ValidMinimal.json"
))
.unwrap();
let reader = BufReader::new(file);
let resp = serde_json::from_reader::<_, TableMetadata>(reader).unwrap();
Table::builder()
.metadata(resp)
.metadata_location("s3://bucket/test/location/metadata/v1.json".to_string())
.identifier(TableIdent::from_strs(["ns1", "test1"]).unwrap())
.file_io(FileIOBuilder::new("memory").build().unwrap())
.build()
.unwrap()
}
#[test]
fn test_upgrade_table_version_v1_to_v2() {
let table = make_v1_table();
let tx = Transaction::new(&table);
let tx = tx.upgrade_table_version(FormatVersion::V2).unwrap();
assert_eq!(
vec![TableUpdate::UpgradeFormatVersion {
format_version: FormatVersion::V2
}],
tx.updates
);
}
#[test]
fn test_upgrade_table_version_v2_to_v2() {
let table = make_v2_table();
let tx = Transaction::new(&table);
let tx = tx.upgrade_table_version(FormatVersion::V2).unwrap();
assert!(
tx.updates.is_empty(),
"Upgrade table to same version should not generate any updates"
);
assert!(
tx.requirements.is_empty(),
"Upgrade table to same version should not generate any requirements"
);
}
#[test]
fn test_downgrade_table_version() {
let table = make_v2_table();
let tx = Transaction::new(&table);
let tx = tx.upgrade_table_version(FormatVersion::V1);
assert!(tx.is_err(), "Downgrade table version should fail!");
}
#[test]
fn test_set_table_property() {
let table = make_v2_table();
let tx = Transaction::new(&table);
let tx = tx
.set_properties(HashMap::from([("a".to_string(), "b".to_string())]))
.unwrap();
assert_eq!(
vec![TableUpdate::SetProperties {
updates: HashMap::from([("a".to_string(), "b".to_string())])
}],
tx.updates
);
}
#[test]
fn test_remove_property() {
let table = make_v2_table();
let tx = Transaction::new(&table);
let tx = tx
.remove_properties(vec!["a".to_string(), "b".to_string()])
.unwrap();
assert_eq!(
vec![TableUpdate::RemoveProperties {
removals: vec!["a".to_string(), "b".to_string()]
}],
tx.updates
);
}
#[test]
fn test_do_same_update_in_same_transaction() {
let table = make_v2_table();
let tx = Transaction::new(&table);
let tx = tx
.remove_properties(vec!["a".to_string(), "b".to_string()])
.unwrap();
let tx = tx.remove_properties(vec!["c".to_string(), "d".to_string()]);
assert!(
tx.is_err(),
"Should not allow to do same kinds update in same transaction"
);
}
}