1mod action;
54
55pub use action::*;
56mod append;
57mod snapshot;
58mod sort_order;
59mod update_location;
60mod update_properties;
61mod update_statistics;
62mod upgrade_format_version;
63
64use std::sync::Arc;
65use std::time::Duration;
66
67use backon::{BackoffBuilder, ExponentialBackoff, ExponentialBuilder, RetryableWithContext};
68
69use crate::error::Result;
70use crate::spec::TableProperties;
71use crate::table::Table;
72use crate::transaction::action::BoxedTransactionAction;
73use crate::transaction::append::FastAppendAction;
74use crate::transaction::sort_order::ReplaceSortOrderAction;
75use crate::transaction::update_location::UpdateLocationAction;
76use crate::transaction::update_properties::UpdatePropertiesAction;
77use crate::transaction::update_statistics::UpdateStatisticsAction;
78use crate::transaction::upgrade_format_version::UpgradeFormatVersionAction;
79use crate::{Catalog, Error, ErrorKind, TableCommit, TableRequirement, TableUpdate};
80
81#[derive(Clone)]
83pub struct Transaction {
84 table: Table,
85 actions: Vec<BoxedTransactionAction>,
86}
87
88impl Transaction {
89 pub fn new(table: &Table) -> Self {
91 Self {
92 table: table.clone(),
93 actions: vec![],
94 }
95 }
96
97 fn update_table_metadata(table: Table, updates: &[TableUpdate]) -> Result<Table> {
98 let mut metadata_builder = table.metadata().clone().into_builder(None);
99 for update in updates {
100 metadata_builder = update.clone().apply(metadata_builder)?;
101 }
102
103 Ok(table.with_metadata(Arc::new(metadata_builder.build()?.metadata)))
104 }
105
106 fn apply(
109 table: Table,
110 mut action_commit: ActionCommit,
111 existing_updates: &mut Vec<TableUpdate>,
112 existing_requirements: &mut Vec<TableRequirement>,
113 ) -> Result<Table> {
114 let updates = action_commit.take_updates();
115 let requirements = action_commit.take_requirements();
116
117 for requirement in &requirements {
118 requirement.check(Some(table.metadata()))?;
119 }
120
121 let updated_table = Self::update_table_metadata(table, &updates)?;
122
123 existing_updates.extend(updates);
124 existing_requirements.extend(requirements);
125
126 Ok(updated_table)
127 }
128
129 pub fn upgrade_table_version(&self) -> UpgradeFormatVersionAction {
131 UpgradeFormatVersionAction::new()
132 }
133
134 pub fn update_table_properties(&self) -> UpdatePropertiesAction {
136 UpdatePropertiesAction::new()
137 }
138
139 pub fn fast_append(&self) -> FastAppendAction {
141 FastAppendAction::new()
142 }
143
144 pub fn replace_sort_order(&self) -> ReplaceSortOrderAction {
146 ReplaceSortOrderAction::new()
147 }
148
149 pub fn update_location(&self) -> UpdateLocationAction {
151 UpdateLocationAction::new()
152 }
153
154 pub fn update_statistics(&self) -> UpdateStatisticsAction {
156 UpdateStatisticsAction::new()
157 }
158
159 pub async fn commit(self, catalog: &dyn Catalog) -> Result<Table> {
161 if self.actions.is_empty() {
162 return Ok(self.table);
164 }
165
166 let table_props =
167 TableProperties::try_from(self.table.metadata().properties()).map_err(|e| {
168 Error::new(ErrorKind::DataInvalid, "Invalid table properties").with_source(e)
169 })?;
170
171 let backoff = Self::build_backoff(table_props)?;
172 let tx = self;
173
174 (|mut tx: Transaction| async {
175 let result = tx.do_commit(catalog).await;
176 (tx, result)
177 })
178 .retry(backoff)
179 .sleep(tokio::time::sleep)
180 .context(tx)
181 .when(|e| e.retryable())
182 .await
183 .1
184 }
185
186 fn build_backoff(props: TableProperties) -> Result<ExponentialBackoff> {
187 Ok(ExponentialBuilder::new()
188 .with_min_delay(Duration::from_millis(props.commit_min_retry_wait_ms))
189 .with_max_delay(Duration::from_millis(props.commit_max_retry_wait_ms))
190 .with_total_delay(Some(Duration::from_millis(
191 props.commit_total_retry_timeout_ms,
192 )))
193 .with_max_times(props.commit_num_retries)
194 .with_factor(2.0)
195 .build())
196 }
197
198 async fn do_commit(&mut self, catalog: &dyn Catalog) -> Result<Table> {
199 let refreshed = catalog.load_table(self.table.identifier()).await?;
200
201 if self.table.metadata() != refreshed.metadata()
202 || self.table.metadata_location() != refreshed.metadata_location()
203 {
204 self.table = refreshed.clone();
206 }
207
208 let mut current_table = self.table.clone();
209 let mut existing_updates: Vec<TableUpdate> = vec![];
210 let mut existing_requirements: Vec<TableRequirement> = vec![];
211
212 for action in &self.actions {
213 let action_commit = Arc::clone(action).commit(¤t_table).await?;
214 current_table = Self::apply(
216 current_table,
217 action_commit,
218 &mut existing_updates,
219 &mut existing_requirements,
220 )?;
221 }
222
223 let table_commit = TableCommit::builder()
224 .ident(self.table.identifier().to_owned())
225 .updates(existing_updates)
226 .requirements(existing_requirements)
227 .build();
228
229 catalog.update_table(table_commit).await
230 }
231}
232
233#[cfg(test)]
234mod tests {
235 use std::collections::HashMap;
236 use std::fs::File;
237 use std::io::BufReader;
238 use std::sync::Arc;
239 use std::sync::atomic::{AtomicU32, Ordering};
240
241 use crate::catalog::MockCatalog;
242 use crate::io::FileIOBuilder;
243 use crate::spec::TableMetadata;
244 use crate::table::Table;
245 use crate::transaction::{ApplyTransactionAction, Transaction};
246 use crate::{Catalog, Error, ErrorKind, TableCreation, TableIdent};
247
248 pub fn make_v1_table() -> Table {
249 let file = File::open(format!(
250 "{}/testdata/table_metadata/{}",
251 env!("CARGO_MANIFEST_DIR"),
252 "TableMetadataV1Valid.json"
253 ))
254 .unwrap();
255 let reader = BufReader::new(file);
256 let resp = serde_json::from_reader::<_, TableMetadata>(reader).unwrap();
257
258 Table::builder()
259 .metadata(resp)
260 .metadata_location("s3://bucket/test/location/metadata/v1.json".to_string())
261 .identifier(TableIdent::from_strs(["ns1", "test1"]).unwrap())
262 .file_io(FileIOBuilder::new("memory").build().unwrap())
263 .build()
264 .unwrap()
265 }
266
267 pub fn make_v2_table() -> Table {
268 let file = File::open(format!(
269 "{}/testdata/table_metadata/{}",
270 env!("CARGO_MANIFEST_DIR"),
271 "TableMetadataV2Valid.json"
272 ))
273 .unwrap();
274 let reader = BufReader::new(file);
275 let resp = serde_json::from_reader::<_, TableMetadata>(reader).unwrap();
276
277 Table::builder()
278 .metadata(resp)
279 .metadata_location("s3://bucket/test/location/metadata/v1.json".to_string())
280 .identifier(TableIdent::from_strs(["ns1", "test1"]).unwrap())
281 .file_io(FileIOBuilder::new("memory").build().unwrap())
282 .build()
283 .unwrap()
284 }
285
286 pub fn make_v2_minimal_table() -> Table {
287 let file = File::open(format!(
288 "{}/testdata/table_metadata/{}",
289 env!("CARGO_MANIFEST_DIR"),
290 "TableMetadataV2ValidMinimal.json"
291 ))
292 .unwrap();
293 let reader = BufReader::new(file);
294 let resp = serde_json::from_reader::<_, TableMetadata>(reader).unwrap();
295
296 Table::builder()
297 .metadata(resp)
298 .metadata_location("s3://bucket/test/location/metadata/v1.json".to_string())
299 .identifier(TableIdent::from_strs(["ns1", "test1"]).unwrap())
300 .file_io(FileIOBuilder::new("memory").build().unwrap())
301 .build()
302 .unwrap()
303 }
304
305 pub(crate) async fn make_v3_minimal_table_in_catalog(catalog: &impl Catalog) -> Table {
306 let table_ident =
307 TableIdent::from_strs([format!("ns1-{}", uuid::Uuid::new_v4()), "test1".to_string()])
308 .unwrap();
309
310 catalog
311 .create_namespace(table_ident.namespace(), HashMap::new())
312 .await
313 .unwrap();
314
315 let file = File::open(format!(
316 "{}/testdata/table_metadata/{}",
317 env!("CARGO_MANIFEST_DIR"),
318 "TableMetadataV3ValidMinimal.json"
319 ))
320 .unwrap();
321 let reader = BufReader::new(file);
322 let base_metadata = serde_json::from_reader::<_, TableMetadata>(reader).unwrap();
323
324 let table_creation = TableCreation::builder()
325 .schema((**base_metadata.current_schema()).clone())
326 .partition_spec((**base_metadata.default_partition_spec()).clone())
327 .sort_order((**base_metadata.default_sort_order()).clone())
328 .name(table_ident.name().to_string())
329 .format_version(crate::spec::FormatVersion::V3)
330 .build();
331
332 catalog
333 .create_table(table_ident.namespace(), table_creation)
334 .await
335 .unwrap()
336 }
337
338 pub(super) fn setup_test_table(num_retries: &str) -> Table {
340 let table = make_v2_table();
341
342 let mut props = HashMap::new();
344 props.insert("commit.retry.min-wait-ms".to_string(), "10".to_string());
345 props.insert("commit.retry.max-wait-ms".to_string(), "100".to_string());
346 props.insert(
347 "commit.retry.total-timeout-ms".to_string(),
348 "1000".to_string(),
349 );
350 props.insert(
351 "commit.retry.num-retries".to_string(),
352 num_retries.to_string(),
353 );
354
355 let metadata = table
357 .metadata()
358 .clone()
359 .into_builder(None)
360 .set_properties(props)
361 .unwrap()
362 .build()
363 .unwrap()
364 .metadata;
365
366 table.with_metadata(Arc::new(metadata))
367 }
368
369 fn create_test_transaction(table: &Table) -> Transaction {
371 let tx = Transaction::new(table);
372 tx.update_table_properties()
373 .set("test.key".to_string(), "test.value".to_string())
374 .apply(tx)
375 .unwrap()
376 }
377
378 fn setup_mock_catalog_with_retryable_errors(
380 success_after_attempts: Option<u32>,
381 expected_calls: usize,
382 ) -> MockCatalog {
383 let mut mock_catalog = MockCatalog::new();
384
385 mock_catalog
386 .expect_load_table()
387 .returning_st(|_| Box::pin(async move { Ok(make_v2_table()) }));
388
389 let attempts = AtomicU32::new(0);
390 mock_catalog
391 .expect_update_table()
392 .times(expected_calls)
393 .returning_st(move |_| {
394 if let Some(success_after_attempts) = success_after_attempts {
395 attempts.fetch_add(1, Ordering::SeqCst);
396 if attempts.load(Ordering::SeqCst) <= success_after_attempts {
397 Box::pin(async move {
398 Err(
399 Error::new(ErrorKind::CatalogCommitConflicts, "Commit conflict")
400 .with_retryable(true),
401 )
402 })
403 } else {
404 Box::pin(async move { Ok(make_v2_table()) })
405 }
406 } else {
407 Box::pin(async move {
409 Err(
410 Error::new(ErrorKind::CatalogCommitConflicts, "Commit conflict")
411 .with_retryable(true),
412 )
413 })
414 }
415 });
416
417 mock_catalog
418 }
419
420 fn setup_mock_catalog_with_non_retryable_error() -> MockCatalog {
422 let mut mock_catalog = MockCatalog::new();
423
424 mock_catalog
425 .expect_load_table()
426 .returning_st(|_| Box::pin(async move { Ok(make_v2_table()) }));
427
428 mock_catalog
429 .expect_update_table()
430 .times(1) .returning_st(move |_| {
432 Box::pin(async move {
433 Err(Error::new(ErrorKind::Unexpected, "Non-retryable error")
434 .with_retryable(false))
435 })
436 });
437
438 mock_catalog
439 }
440
441 #[tokio::test]
442 async fn test_commit_retryable_error() {
443 let table = setup_test_table("3");
445
446 let tx = create_test_transaction(&table);
448
449 let mock_catalog = setup_mock_catalog_with_retryable_errors(Some(2), 3);
451
452 let result = tx.commit(&mock_catalog).await;
454
455 assert!(result.is_ok(), "Transaction should eventually succeed");
457 }
458
459 #[tokio::test]
460 async fn test_commit_non_retryable_error() {
461 let table = setup_test_table("3");
463
464 let tx = create_test_transaction(&table);
466
467 let mock_catalog = setup_mock_catalog_with_non_retryable_error();
469
470 let result = tx.commit(&mock_catalog).await;
472
473 assert!(result.is_err(), "Transaction should fail immediately");
475 if let Err(err) = result {
476 assert_eq!(err.kind(), ErrorKind::Unexpected);
477 assert_eq!(err.message(), "Non-retryable error");
478 assert!(!err.retryable(), "Error should not be retryable");
479 }
480 }
481
482 #[tokio::test]
483 async fn test_commit_max_retries_exceeded() {
484 let table = setup_test_table("2");
486
487 let tx = create_test_transaction(&table);
489
490 let mock_catalog = setup_mock_catalog_with_retryable_errors(None, 3); let result = tx.commit(&mock_catalog).await;
495
496 assert!(result.is_err(), "Transaction should fail after max retries");
498 if let Err(err) = result {
499 assert_eq!(err.kind(), ErrorKind::CatalogCommitConflicts);
500 assert_eq!(err.message(), "Commit conflict");
501 assert!(err.retryable(), "Error should be retryable");
502 }
503 }
504}
505
506#[cfg(test)]
507mod test_row_lineage {
508 use crate::memory::tests::new_memory_catalog;
509 use crate::spec::{
510 DataContentType, DataFile, DataFileBuilder, DataFileFormat, Literal, Struct,
511 };
512 use crate::transaction::tests::make_v3_minimal_table_in_catalog;
513 use crate::transaction::{ApplyTransactionAction, Transaction};
514
515 #[tokio::test]
516 async fn test_fast_append_with_row_lineage() {
517 fn file_with_rows(record_count: u64) -> DataFile {
519 DataFileBuilder::default()
520 .content(DataContentType::Data)
521 .file_path(format!("test/{}.parquet", record_count))
522 .file_format(DataFileFormat::Parquet)
523 .file_size_in_bytes(100)
524 .record_count(record_count)
525 .partition(Struct::from_iter([Some(Literal::long(0))]))
526 .partition_spec_id(0)
527 .build()
528 .unwrap()
529 }
530 let catalog = new_memory_catalog().await;
531
532 let table = make_v3_minimal_table_in_catalog(&catalog).await;
533
534 assert_eq!(table.metadata().next_row_id(), 0);
536
537 let tx = Transaction::new(&table);
539 let data_file_30 = file_with_rows(30);
540 let action = tx.fast_append().add_data_files(vec![data_file_30]);
541 let tx = action.apply(tx).unwrap();
542 let table = tx.commit(&catalog).await.unwrap();
543
544 let snapshot = table.metadata().current_snapshot().unwrap();
546 assert_eq!(snapshot.first_row_id(), Some(0));
547 assert_eq!(table.metadata().next_row_id(), 30);
548
549 let manifest_list = table
551 .metadata()
552 .current_snapshot()
553 .unwrap()
554 .load_manifest_list(table.file_io(), table.metadata())
555 .await
556 .unwrap();
557
558 assert_eq!(manifest_list.entries().len(), 1);
559 let manifest_file = &manifest_list.entries()[0];
560 assert_eq!(manifest_file.first_row_id, Some(0));
561
562 let tx = Transaction::new(&table);
564 let data_file_17 = file_with_rows(17);
565 let data_file_11 = file_with_rows(11);
566 let action = tx
567 .fast_append()
568 .add_data_files(vec![data_file_17, data_file_11]);
569 let tx = action.apply(tx).unwrap();
570 let table = tx.commit(&catalog).await.unwrap();
571
572 let snapshot = table.metadata().current_snapshot().unwrap();
574 assert_eq!(snapshot.first_row_id(), Some(30));
575 assert_eq!(table.metadata().next_row_id(), 30 + 17 + 11);
576
577 let manifest_list = table
579 .metadata()
580 .current_snapshot()
581 .unwrap()
582 .load_manifest_list(table.file_io(), table.metadata())
583 .await
584 .unwrap();
585 assert_eq!(manifest_list.entries().len(), 2);
586 let manifest_file = &manifest_list.entries()[1];
587 assert_eq!(manifest_file.first_row_id, Some(30));
588 }
589}