1mod action;
54
55pub use action::*;
56mod append;
57mod snapshot;
58mod sort_order;
59mod update_location;
60mod update_properties;
61mod update_statistics;
62mod upgrade_format_version;
63
64use std::sync::Arc;
65use std::time::Duration;
66
67use backon::{BackoffBuilder, ExponentialBackoff, ExponentialBuilder, RetryableWithContext};
68
69use crate::error::Result;
70use crate::spec::TableProperties;
71use crate::table::Table;
72use crate::transaction::action::BoxedTransactionAction;
73use crate::transaction::append::FastAppendAction;
74use crate::transaction::sort_order::ReplaceSortOrderAction;
75use crate::transaction::update_location::UpdateLocationAction;
76use crate::transaction::update_properties::UpdatePropertiesAction;
77use crate::transaction::update_statistics::UpdateStatisticsAction;
78use crate::transaction::upgrade_format_version::UpgradeFormatVersionAction;
79use crate::{Catalog, TableCommit, TableRequirement, TableUpdate};
80
81#[derive(Clone)]
83pub struct Transaction {
84 table: Table,
85 actions: Vec<BoxedTransactionAction>,
86}
87
88impl Transaction {
89 pub fn new(table: &Table) -> Self {
91 Self {
92 table: table.clone(),
93 actions: vec![],
94 }
95 }
96
97 fn update_table_metadata(table: Table, updates: &[TableUpdate]) -> Result<Table> {
98 let mut metadata_builder = table.metadata().clone().into_builder(None);
99 for update in updates {
100 metadata_builder = update.clone().apply(metadata_builder)?;
101 }
102
103 Ok(table.with_metadata(Arc::new(metadata_builder.build()?.metadata)))
104 }
105
106 fn apply(
109 table: Table,
110 mut action_commit: ActionCommit,
111 existing_updates: &mut Vec<TableUpdate>,
112 existing_requirements: &mut Vec<TableRequirement>,
113 ) -> Result<Table> {
114 let updates = action_commit.take_updates();
115 let requirements = action_commit.take_requirements();
116
117 for requirement in &requirements {
118 requirement.check(Some(table.metadata()))?;
119 }
120
121 let updated_table = Self::update_table_metadata(table, &updates)?;
122
123 existing_updates.extend(updates);
124 existing_requirements.extend(requirements);
125
126 Ok(updated_table)
127 }
128
129 pub fn upgrade_table_version(&self) -> UpgradeFormatVersionAction {
131 UpgradeFormatVersionAction::new()
132 }
133
134 pub fn update_table_properties(&self) -> UpdatePropertiesAction {
136 UpdatePropertiesAction::new()
137 }
138
139 pub fn fast_append(&self) -> FastAppendAction {
141 FastAppendAction::new()
142 }
143
144 pub fn replace_sort_order(&self) -> ReplaceSortOrderAction {
146 ReplaceSortOrderAction::new()
147 }
148
149 pub fn update_location(&self) -> UpdateLocationAction {
151 UpdateLocationAction::new()
152 }
153
154 pub fn update_statistics(&self) -> UpdateStatisticsAction {
156 UpdateStatisticsAction::new()
157 }
158
159 pub async fn commit(self, catalog: &dyn Catalog) -> Result<Table> {
161 if self.actions.is_empty() {
162 return Ok(self.table);
164 }
165
166 let table_props = self.table.metadata().table_properties()?;
167
168 let backoff = Self::build_backoff(table_props)?;
169 let tx = self;
170
171 (|mut tx: Transaction| async {
172 let result = tx.do_commit(catalog).await;
173 (tx, result)
174 })
175 .retry(backoff)
176 .sleep(tokio::time::sleep)
177 .context(tx)
178 .when(|e| e.retryable())
179 .await
180 .1
181 }
182
183 fn build_backoff(props: TableProperties) -> Result<ExponentialBackoff> {
184 Ok(ExponentialBuilder::new()
185 .with_min_delay(Duration::from_millis(props.commit_min_retry_wait_ms))
186 .with_max_delay(Duration::from_millis(props.commit_max_retry_wait_ms))
187 .with_total_delay(Some(Duration::from_millis(
188 props.commit_total_retry_timeout_ms,
189 )))
190 .with_max_times(props.commit_num_retries)
191 .with_factor(2.0)
192 .build())
193 }
194
195 async fn do_commit(&mut self, catalog: &dyn Catalog) -> Result<Table> {
196 let refreshed = catalog.load_table(self.table.identifier()).await?;
197
198 if self.table.metadata() != refreshed.metadata()
199 || self.table.metadata_location() != refreshed.metadata_location()
200 {
201 self.table = refreshed.clone();
203 }
204
205 let mut current_table = self.table.clone();
206 let mut existing_updates: Vec<TableUpdate> = vec![];
207 let mut existing_requirements: Vec<TableRequirement> = vec![];
208
209 for action in &self.actions {
210 let action_commit = Arc::clone(action).commit(¤t_table).await?;
211 current_table = Self::apply(
213 current_table,
214 action_commit,
215 &mut existing_updates,
216 &mut existing_requirements,
217 )?;
218 }
219
220 let table_commit = TableCommit::builder()
221 .ident(self.table.identifier().to_owned())
222 .updates(existing_updates)
223 .requirements(existing_requirements)
224 .build();
225
226 catalog.update_table(table_commit).await
227 }
228}
229
230#[cfg(test)]
231mod tests {
232 use std::collections::HashMap;
233 use std::fs::File;
234 use std::io::BufReader;
235 use std::sync::Arc;
236 use std::sync::atomic::{AtomicU32, Ordering};
237
238 use crate::catalog::MockCatalog;
239 use crate::io::FileIOBuilder;
240 use crate::spec::TableMetadata;
241 use crate::table::Table;
242 use crate::transaction::{ApplyTransactionAction, Transaction};
243 use crate::{Catalog, Error, ErrorKind, TableCreation, TableIdent};
244
245 pub fn make_v1_table() -> Table {
246 let file = File::open(format!(
247 "{}/testdata/table_metadata/{}",
248 env!("CARGO_MANIFEST_DIR"),
249 "TableMetadataV1Valid.json"
250 ))
251 .unwrap();
252 let reader = BufReader::new(file);
253 let resp = serde_json::from_reader::<_, TableMetadata>(reader).unwrap();
254
255 Table::builder()
256 .metadata(resp)
257 .metadata_location("s3://bucket/test/location/metadata/v1.json".to_string())
258 .identifier(TableIdent::from_strs(["ns1", "test1"]).unwrap())
259 .file_io(FileIOBuilder::new("memory").build().unwrap())
260 .build()
261 .unwrap()
262 }
263
264 pub fn make_v2_table() -> Table {
265 let file = File::open(format!(
266 "{}/testdata/table_metadata/{}",
267 env!("CARGO_MANIFEST_DIR"),
268 "TableMetadataV2Valid.json"
269 ))
270 .unwrap();
271 let reader = BufReader::new(file);
272 let resp = serde_json::from_reader::<_, TableMetadata>(reader).unwrap();
273
274 Table::builder()
275 .metadata(resp)
276 .metadata_location("s3://bucket/test/location/metadata/v1.json".to_string())
277 .identifier(TableIdent::from_strs(["ns1", "test1"]).unwrap())
278 .file_io(FileIOBuilder::new("memory").build().unwrap())
279 .build()
280 .unwrap()
281 }
282
283 pub fn make_v2_minimal_table() -> Table {
284 let file = File::open(format!(
285 "{}/testdata/table_metadata/{}",
286 env!("CARGO_MANIFEST_DIR"),
287 "TableMetadataV2ValidMinimal.json"
288 ))
289 .unwrap();
290 let reader = BufReader::new(file);
291 let resp = serde_json::from_reader::<_, TableMetadata>(reader).unwrap();
292
293 Table::builder()
294 .metadata(resp)
295 .metadata_location("s3://bucket/test/location/metadata/v1.json".to_string())
296 .identifier(TableIdent::from_strs(["ns1", "test1"]).unwrap())
297 .file_io(FileIOBuilder::new("memory").build().unwrap())
298 .build()
299 .unwrap()
300 }
301
302 pub(crate) async fn make_v3_minimal_table_in_catalog(catalog: &impl Catalog) -> Table {
303 let table_ident =
304 TableIdent::from_strs([format!("ns1-{}", uuid::Uuid::new_v4()), "test1".to_string()])
305 .unwrap();
306
307 catalog
308 .create_namespace(table_ident.namespace(), HashMap::new())
309 .await
310 .unwrap();
311
312 let file = File::open(format!(
313 "{}/testdata/table_metadata/{}",
314 env!("CARGO_MANIFEST_DIR"),
315 "TableMetadataV3ValidMinimal.json"
316 ))
317 .unwrap();
318 let reader = BufReader::new(file);
319 let base_metadata = serde_json::from_reader::<_, TableMetadata>(reader).unwrap();
320
321 let table_creation = TableCreation::builder()
322 .schema((**base_metadata.current_schema()).clone())
323 .partition_spec((**base_metadata.default_partition_spec()).clone())
324 .sort_order((**base_metadata.default_sort_order()).clone())
325 .name(table_ident.name().to_string())
326 .format_version(crate::spec::FormatVersion::V3)
327 .build();
328
329 catalog
330 .create_table(table_ident.namespace(), table_creation)
331 .await
332 .unwrap()
333 }
334
335 pub(super) fn setup_test_table(num_retries: &str) -> Table {
337 let table = make_v2_table();
338
339 let mut props = HashMap::new();
341 props.insert("commit.retry.min-wait-ms".to_string(), "10".to_string());
342 props.insert("commit.retry.max-wait-ms".to_string(), "100".to_string());
343 props.insert(
344 "commit.retry.total-timeout-ms".to_string(),
345 "1000".to_string(),
346 );
347 props.insert(
348 "commit.retry.num-retries".to_string(),
349 num_retries.to_string(),
350 );
351
352 let metadata = table
354 .metadata()
355 .clone()
356 .into_builder(None)
357 .set_properties(props)
358 .unwrap()
359 .build()
360 .unwrap()
361 .metadata;
362
363 table.with_metadata(Arc::new(metadata))
364 }
365
366 fn create_test_transaction(table: &Table) -> Transaction {
368 let tx = Transaction::new(table);
369 tx.update_table_properties()
370 .set("test.key".to_string(), "test.value".to_string())
371 .apply(tx)
372 .unwrap()
373 }
374
375 fn setup_mock_catalog_with_retryable_errors(
377 success_after_attempts: Option<u32>,
378 expected_calls: usize,
379 ) -> MockCatalog {
380 let mut mock_catalog = MockCatalog::new();
381
382 mock_catalog
383 .expect_load_table()
384 .returning_st(|_| Box::pin(async move { Ok(make_v2_table()) }));
385
386 let attempts = AtomicU32::new(0);
387 mock_catalog
388 .expect_update_table()
389 .times(expected_calls)
390 .returning_st(move |_| {
391 if let Some(success_after_attempts) = success_after_attempts {
392 attempts.fetch_add(1, Ordering::SeqCst);
393 if attempts.load(Ordering::SeqCst) <= success_after_attempts {
394 Box::pin(async move {
395 Err(
396 Error::new(ErrorKind::CatalogCommitConflicts, "Commit conflict")
397 .with_retryable(true),
398 )
399 })
400 } else {
401 Box::pin(async move { Ok(make_v2_table()) })
402 }
403 } else {
404 Box::pin(async move {
406 Err(
407 Error::new(ErrorKind::CatalogCommitConflicts, "Commit conflict")
408 .with_retryable(true),
409 )
410 })
411 }
412 });
413
414 mock_catalog
415 }
416
417 fn setup_mock_catalog_with_non_retryable_error() -> MockCatalog {
419 let mut mock_catalog = MockCatalog::new();
420
421 mock_catalog
422 .expect_load_table()
423 .returning_st(|_| Box::pin(async move { Ok(make_v2_table()) }));
424
425 mock_catalog
426 .expect_update_table()
427 .times(1) .returning_st(move |_| {
429 Box::pin(async move {
430 Err(Error::new(ErrorKind::Unexpected, "Non-retryable error")
431 .with_retryable(false))
432 })
433 });
434
435 mock_catalog
436 }
437
438 #[tokio::test]
439 async fn test_commit_retryable_error() {
440 let table = setup_test_table("3");
442
443 let tx = create_test_transaction(&table);
445
446 let mock_catalog = setup_mock_catalog_with_retryable_errors(Some(2), 3);
448
449 let result = tx.commit(&mock_catalog).await;
451
452 assert!(result.is_ok(), "Transaction should eventually succeed");
454 }
455
456 #[tokio::test]
457 async fn test_commit_non_retryable_error() {
458 let table = setup_test_table("3");
460
461 let tx = create_test_transaction(&table);
463
464 let mock_catalog = setup_mock_catalog_with_non_retryable_error();
466
467 let result = tx.commit(&mock_catalog).await;
469
470 assert!(result.is_err(), "Transaction should fail immediately");
472 if let Err(err) = result {
473 assert_eq!(err.kind(), ErrorKind::Unexpected);
474 assert_eq!(err.message(), "Non-retryable error");
475 assert!(!err.retryable(), "Error should not be retryable");
476 }
477 }
478
479 #[tokio::test]
480 async fn test_commit_max_retries_exceeded() {
481 let table = setup_test_table("2");
483
484 let tx = create_test_transaction(&table);
486
487 let mock_catalog = setup_mock_catalog_with_retryable_errors(None, 3); let result = tx.commit(&mock_catalog).await;
492
493 assert!(result.is_err(), "Transaction should fail after max retries");
495 if let Err(err) = result {
496 assert_eq!(err.kind(), ErrorKind::CatalogCommitConflicts);
497 assert_eq!(err.message(), "Commit conflict");
498 assert!(err.retryable(), "Error should be retryable");
499 }
500 }
501}
502
503#[cfg(test)]
504mod test_row_lineage {
505 use crate::memory::tests::new_memory_catalog;
506 use crate::spec::{
507 DataContentType, DataFile, DataFileBuilder, DataFileFormat, Literal, Struct,
508 };
509 use crate::transaction::tests::make_v3_minimal_table_in_catalog;
510 use crate::transaction::{ApplyTransactionAction, Transaction};
511
512 #[tokio::test]
513 async fn test_fast_append_with_row_lineage() {
514 fn file_with_rows(record_count: u64) -> DataFile {
516 DataFileBuilder::default()
517 .content(DataContentType::Data)
518 .file_path(format!("test/{record_count}.parquet"))
519 .file_format(DataFileFormat::Parquet)
520 .file_size_in_bytes(100)
521 .record_count(record_count)
522 .partition(Struct::from_iter([Some(Literal::long(0))]))
523 .partition_spec_id(0)
524 .build()
525 .unwrap()
526 }
527 let catalog = new_memory_catalog().await;
528
529 let table = make_v3_minimal_table_in_catalog(&catalog).await;
530
531 assert_eq!(table.metadata().next_row_id(), 0);
533
534 let tx = Transaction::new(&table);
536 let data_file_30 = file_with_rows(30);
537 let action = tx.fast_append().add_data_files(vec![data_file_30]);
538 let tx = action.apply(tx).unwrap();
539 let table = tx.commit(&catalog).await.unwrap();
540
541 let snapshot = table.metadata().current_snapshot().unwrap();
543 assert_eq!(snapshot.first_row_id(), Some(0));
544 assert_eq!(table.metadata().next_row_id(), 30);
545
546 let manifest_list = table
548 .metadata()
549 .current_snapshot()
550 .unwrap()
551 .load_manifest_list(table.file_io(), table.metadata())
552 .await
553 .unwrap();
554
555 assert_eq!(manifest_list.entries().len(), 1);
556 let manifest_file = &manifest_list.entries()[0];
557 assert_eq!(manifest_file.first_row_id, Some(0));
558
559 let tx = Transaction::new(&table);
561 let data_file_17 = file_with_rows(17);
562 let data_file_11 = file_with_rows(11);
563 let action = tx
564 .fast_append()
565 .add_data_files(vec![data_file_17, data_file_11]);
566 let tx = action.apply(tx).unwrap();
567 let table = tx.commit(&catalog).await.unwrap();
568
569 let snapshot = table.metadata().current_snapshot().unwrap();
571 assert_eq!(snapshot.first_row_id(), Some(30));
572 assert_eq!(table.metadata().next_row_id(), 30 + 17 + 11);
573
574 let manifest_list = table
576 .metadata()
577 .current_snapshot()
578 .unwrap()
579 .load_manifest_list(table.file_io(), table.metadata())
580 .await
581 .unwrap();
582 assert_eq!(manifest_list.entries().len(), 2);
583 let manifest_file = &manifest_list.entries()[1];
584 assert_eq!(manifest_file.first_row_id, Some(30));
585 }
586}