1mod action;
54
55pub use action::*;
56mod append;
57mod expire_snapshots;
58mod snapshot;
59mod sort_order;
60mod update_location;
61mod update_properties;
62mod update_schema;
63mod update_statistics;
64mod upgrade_format_version;
65
66use std::sync::Arc;
67use std::time::Duration;
68
69use backon::{BackoffBuilder, ExponentialBackoff, ExponentialBuilder, RetryableWithContext};
70pub use update_schema::AddColumn;
71
72use crate::error::Result;
73use crate::spec::TableProperties;
74use crate::table::Table;
75use crate::transaction::action::BoxedTransactionAction;
76use crate::transaction::append::FastAppendAction;
77use crate::transaction::expire_snapshots::ExpireSnapshotsAction;
78use crate::transaction::sort_order::ReplaceSortOrderAction;
79use crate::transaction::update_location::UpdateLocationAction;
80use crate::transaction::update_properties::UpdatePropertiesAction;
81use crate::transaction::update_schema::UpdateSchemaAction;
82use crate::transaction::update_statistics::UpdateStatisticsAction;
83use crate::transaction::upgrade_format_version::UpgradeFormatVersionAction;
84use crate::{Catalog, Error, ErrorKind, TableCommit, TableRequirement, TableUpdate};
85
86#[derive(Clone)]
88pub struct Transaction {
89 table: Table,
90 actions: Vec<BoxedTransactionAction>,
91}
92
93impl Transaction {
94 pub fn new(table: &Table) -> Self {
96 Self {
97 table: table.clone(),
98 actions: vec![],
99 }
100 }
101
102 fn update_table_metadata(table: Table, updates: &[TableUpdate]) -> Result<Table> {
103 let mut metadata_builder = table.metadata().clone().into_builder(None);
104 for update in updates {
105 metadata_builder = update.clone().apply(metadata_builder)?;
106 }
107
108 Ok(table.with_metadata(Arc::new(metadata_builder.build()?.metadata)))
109 }
110
111 fn apply(
114 table: Table,
115 mut action_commit: ActionCommit,
116 existing_updates: &mut Vec<TableUpdate>,
117 existing_requirements: &mut Vec<TableRequirement>,
118 ) -> Result<Table> {
119 let updates = action_commit.take_updates();
120 let requirements = action_commit.take_requirements();
121
122 for requirement in &requirements {
123 requirement.check(Some(table.metadata()))?;
124 }
125
126 let updated_table = Self::update_table_metadata(table, &updates)?;
127
128 existing_updates.extend(updates);
129 existing_requirements.extend(requirements);
130
131 Ok(updated_table)
132 }
133
134 pub fn upgrade_table_version(&self) -> UpgradeFormatVersionAction {
136 UpgradeFormatVersionAction::new()
137 }
138
139 pub fn update_table_properties(&self) -> UpdatePropertiesAction {
141 UpdatePropertiesAction::new()
142 }
143
144 pub fn update_schema(&self) -> UpdateSchemaAction {
146 UpdateSchemaAction::new()
147 }
148
149 pub fn fast_append(&self) -> FastAppendAction {
151 FastAppendAction::new()
152 }
153
154 pub fn replace_sort_order(&self) -> ReplaceSortOrderAction {
156 ReplaceSortOrderAction::new()
157 }
158
159 pub fn update_location(&self) -> UpdateLocationAction {
161 UpdateLocationAction::new()
162 }
163
164 pub fn update_statistics(&self) -> UpdateStatisticsAction {
166 UpdateStatisticsAction::new()
167 }
168
169 pub fn expire_snapshots(&self) -> ExpireSnapshotsAction {
171 ExpireSnapshotsAction::new()
172 }
173
174 pub async fn commit(self, catalog: &dyn Catalog) -> Result<Table> {
176 if self.actions.is_empty() {
177 return Ok(self.table);
179 }
180
181 let table_props = self.table.metadata().table_properties()?;
182
183 if table_props.encryption_key_id.is_some() {
185 return Err(Error::new(
186 ErrorKind::FeatureUnsupported,
187 "Cannot commit to an encrypted table: encrypted writes are not yet supported",
188 ));
189 }
190
191 let backoff = Self::build_backoff(table_props)?;
192 let tx = self;
193
194 (|mut tx: Transaction| async {
195 let result = tx.do_commit(catalog).await;
196 (tx, result)
197 })
198 .retry(backoff)
199 .sleep(tokio::time::sleep)
200 .context(tx)
201 .when(|e| e.retryable())
202 .await
203 .1
204 }
205
206 fn build_backoff(props: TableProperties) -> Result<ExponentialBackoff> {
207 Ok(ExponentialBuilder::new()
208 .with_min_delay(Duration::from_millis(props.commit_min_retry_wait_ms))
209 .with_max_delay(Duration::from_millis(props.commit_max_retry_wait_ms))
210 .with_total_delay(Some(Duration::from_millis(
211 props.commit_total_retry_timeout_ms,
212 )))
213 .with_max_times(props.commit_num_retries)
214 .with_factor(2.0)
215 .build())
216 }
217
218 async fn do_commit(&mut self, catalog: &dyn Catalog) -> Result<Table> {
219 let refreshed = catalog.load_table(self.table.identifier()).await?;
220
221 if self.table.metadata() != refreshed.metadata()
222 || self.table.metadata_location() != refreshed.metadata_location()
223 {
224 self.table = refreshed.clone();
226 }
227
228 let mut current_table = self.table.clone();
229 let mut existing_updates: Vec<TableUpdate> = vec![];
230 let mut existing_requirements: Vec<TableRequirement> = vec![];
231
232 for action in &self.actions {
233 let action_commit = Arc::clone(action).commit(¤t_table).await?;
234 current_table = Self::apply(
236 current_table,
237 action_commit,
238 &mut existing_updates,
239 &mut existing_requirements,
240 )?;
241 }
242
243 let table_commit = TableCommit::builder()
244 .ident(self.table.identifier().to_owned())
245 .updates(existing_updates)
246 .requirements(existing_requirements)
247 .build();
248
249 catalog.update_table(table_commit).await
250 }
251}
252
253#[cfg(test)]
254mod tests {
255 use std::collections::HashMap;
256 use std::fs::File;
257 use std::io::BufReader;
258 use std::sync::Arc;
259 use std::sync::atomic::{AtomicU32, Ordering};
260
261 use crate::catalog::MockCatalog;
262 use crate::encryption::SensitiveBytes;
263 use crate::encryption::kms::{KeyManagementClient, MemoryKeyManagementClient};
264 use crate::io::FileIO;
265 use crate::memory::tests::new_memory_catalog;
266 use crate::spec::{
267 DataContentType, DataFileBuilder, DataFileFormat, Literal, Struct, TableMetadata,
268 };
269 use crate::table::Table;
270 use crate::test_utils::test_runtime;
271 use crate::transaction::{ApplyTransactionAction, Transaction};
272 use crate::{Catalog, Error, ErrorKind, TableCreation, TableIdent};
273
274 pub fn make_v1_table() -> Table {
275 let file = File::open(format!(
276 "{}/testdata/table_metadata/{}",
277 env!("CARGO_MANIFEST_DIR"),
278 "TableMetadataV1Valid.json"
279 ))
280 .unwrap();
281 let reader = BufReader::new(file);
282 let resp = serde_json::from_reader::<_, TableMetadata>(reader).unwrap();
283
284 Table::builder()
285 .metadata(resp)
286 .metadata_location("s3://bucket/test/location/metadata/v1.json")
287 .identifier(TableIdent::from_strs(["ns1", "test1"]).unwrap())
288 .file_io(FileIO::new_with_memory())
289 .runtime(test_runtime())
290 .build()
291 .unwrap()
292 }
293
294 pub fn make_v2_table() -> Table {
295 let file = File::open(format!(
296 "{}/testdata/table_metadata/{}",
297 env!("CARGO_MANIFEST_DIR"),
298 "TableMetadataV2Valid.json"
299 ))
300 .unwrap();
301 let reader = BufReader::new(file);
302 let resp = serde_json::from_reader::<_, TableMetadata>(reader).unwrap();
303
304 Table::builder()
305 .metadata(resp)
306 .metadata_location("s3://bucket/test/location/metadata/v1.json")
307 .identifier(TableIdent::from_strs(["ns1", "test1"]).unwrap())
308 .file_io(FileIO::new_with_memory())
309 .runtime(test_runtime())
310 .build()
311 .unwrap()
312 }
313
314 pub fn make_v2_minimal_table() -> Table {
315 let file = File::open(format!(
316 "{}/testdata/table_metadata/{}",
317 env!("CARGO_MANIFEST_DIR"),
318 "TableMetadataV2ValidMinimal.json"
319 ))
320 .unwrap();
321 let reader = BufReader::new(file);
322 let resp = serde_json::from_reader::<_, TableMetadata>(reader).unwrap();
323
324 Table::builder()
325 .metadata(resp)
326 .metadata_location("s3://bucket/test/location/metadata/v1.json")
327 .identifier(TableIdent::from_strs(["ns1", "test1"]).unwrap())
328 .file_io(FileIO::new_with_memory())
329 .runtime(test_runtime())
330 .build()
331 .unwrap()
332 }
333
334 pub(crate) async fn make_v3_minimal_table_in_catalog(catalog: &impl Catalog) -> Table {
335 let table_ident =
336 TableIdent::from_strs([format!("ns1-{}", uuid::Uuid::new_v4()), "test1".to_string()])
337 .unwrap();
338
339 catalog
340 .create_namespace(table_ident.namespace(), HashMap::new())
341 .await
342 .unwrap();
343
344 let file = File::open(format!(
345 "{}/testdata/table_metadata/{}",
346 env!("CARGO_MANIFEST_DIR"),
347 "TableMetadataV3ValidMinimal.json"
348 ))
349 .unwrap();
350 let reader = BufReader::new(file);
351 let base_metadata = serde_json::from_reader::<_, TableMetadata>(reader).unwrap();
352
353 let table_creation = TableCreation::builder()
354 .schema((**base_metadata.current_schema()).clone())
355 .partition_spec((**base_metadata.default_partition_spec()).clone())
356 .sort_order((**base_metadata.default_sort_order()).clone())
357 .name(table_ident.name().to_string())
358 .format_version(crate::spec::FormatVersion::V3)
359 .build();
360
361 catalog
362 .create_table(table_ident.namespace(), table_creation)
363 .await
364 .unwrap()
365 }
366
367 pub(super) fn setup_test_table(num_retries: &str) -> Table {
369 let table = make_v2_table();
370
371 let mut props = HashMap::new();
373 props.insert("commit.retry.min-wait-ms".to_string(), "10".to_string());
374 props.insert("commit.retry.max-wait-ms".to_string(), "100".to_string());
375 props.insert(
376 "commit.retry.total-timeout-ms".to_string(),
377 "1000".to_string(),
378 );
379 props.insert(
380 "commit.retry.num-retries".to_string(),
381 num_retries.to_string(),
382 );
383
384 let metadata = table
386 .metadata()
387 .clone()
388 .into_builder(None)
389 .set_properties(props)
390 .unwrap()
391 .build()
392 .unwrap()
393 .metadata;
394
395 table.with_metadata(Arc::new(metadata))
396 }
397
398 fn create_test_transaction(table: &Table) -> Transaction {
400 let tx = Transaction::new(table);
401 tx.update_table_properties()
402 .set("test.key".to_string(), "test.value".to_string())
403 .apply(tx)
404 .unwrap()
405 }
406
407 fn setup_mock_catalog_with_retryable_errors(
409 success_after_attempts: Option<u32>,
410 expected_calls: usize,
411 ) -> MockCatalog {
412 let mut mock_catalog = MockCatalog::new();
413
414 mock_catalog
415 .expect_load_table()
416 .returning_st(|_| Box::pin(async move { Ok(make_v2_table()) }));
417
418 let attempts = AtomicU32::new(0);
419 mock_catalog
420 .expect_update_table()
421 .times(expected_calls)
422 .returning_st(move |_| {
423 if let Some(success_after_attempts) = success_after_attempts {
424 attempts.fetch_add(1, Ordering::SeqCst);
425 if attempts.load(Ordering::SeqCst) <= success_after_attempts {
426 Box::pin(async move {
427 Err(
428 Error::new(ErrorKind::CatalogCommitConflicts, "Commit conflict")
429 .with_retryable(true),
430 )
431 })
432 } else {
433 Box::pin(async move { Ok(make_v2_table()) })
434 }
435 } else {
436 Box::pin(async move {
438 Err(
439 Error::new(ErrorKind::CatalogCommitConflicts, "Commit conflict")
440 .with_retryable(true),
441 )
442 })
443 }
444 });
445
446 mock_catalog
447 }
448
449 fn setup_mock_catalog_with_non_retryable_error() -> MockCatalog {
451 let mut mock_catalog = MockCatalog::new();
452
453 mock_catalog
454 .expect_load_table()
455 .returning_st(|_| Box::pin(async move { Ok(make_v2_table()) }));
456
457 mock_catalog
458 .expect_update_table()
459 .times(1) .returning_st(move |_| {
461 Box::pin(async move {
462 Err(Error::new(ErrorKind::Unexpected, "Non-retryable error")
463 .with_retryable(false))
464 })
465 });
466
467 mock_catalog
468 }
469
470 #[tokio::test]
471 async fn test_commit_retryable_error() {
472 let table = setup_test_table("3");
474
475 let tx = create_test_transaction(&table);
477
478 let mock_catalog = setup_mock_catalog_with_retryable_errors(Some(2), 3);
480
481 let result = tx.commit(&mock_catalog).await;
483
484 assert!(result.is_ok(), "Transaction should eventually succeed");
486 }
487
488 #[tokio::test]
489 async fn test_commit_non_retryable_error() {
490 let table = setup_test_table("3");
492
493 let tx = create_test_transaction(&table);
495
496 let mock_catalog = setup_mock_catalog_with_non_retryable_error();
498
499 let result = tx.commit(&mock_catalog).await;
501
502 assert!(result.is_err(), "Transaction should fail immediately");
504 if let Err(err) = result {
505 assert_eq!(err.kind(), ErrorKind::Unexpected);
506 assert_eq!(err.message(), "Non-retryable error");
507 assert!(!err.retryable(), "Error should not be retryable");
508 }
509 }
510
511 #[tokio::test]
512 async fn test_commit_max_retries_exceeded() {
513 let table = setup_test_table("2");
515
516 let tx = create_test_transaction(&table);
518
519 let mock_catalog = setup_mock_catalog_with_retryable_errors(None, 3); let result = tx.commit(&mock_catalog).await;
524
525 assert!(result.is_err(), "Transaction should fail after max retries");
527 if let Err(err) = result {
528 assert_eq!(err.kind(), ErrorKind::CatalogCommitConflicts);
529 assert_eq!(err.message(), "Commit conflict");
530 assert!(err.retryable(), "Error should be retryable");
531 }
532 }
533
534 #[tokio::test]
535 async fn test_transaction_snapshot_summary() {
536 let catalog = new_memory_catalog().await;
537 let table = make_v3_minimal_table_in_catalog(&catalog).await;
538
539 let mut file_seq = 0u32;
540 let mut append_file = |table: &crate::table::Table, record_count: u64, file_size: u64| {
541 file_seq += 1;
542 let file = DataFileBuilder::default()
543 .content(DataContentType::Data)
544 .file_path(format!("test/{file_seq}.parquet"))
545 .file_format(DataFileFormat::Parquet)
546 .file_size_in_bytes(file_size)
547 .record_count(record_count)
548 .partition(Struct::from_iter([Some(Literal::long(1))]))
549 .partition_spec_id(0)
550 .build()
551 .unwrap();
552 let tx = Transaction::new(table);
553 tx.fast_append()
554 .add_data_files(vec![file])
555 .apply(tx)
556 .unwrap()
557 };
558
559 let table = append_file(&table, 10, 100)
560 .commit(&catalog)
561 .await
562 .unwrap();
563 let table = append_file(&table, 20, 200)
564 .commit(&catalog)
565 .await
566 .unwrap();
567
568 let summary = &table
569 .metadata()
570 .current_snapshot()
571 .unwrap()
572 .summary()
573 .additional_properties;
574
575 assert_eq!(summary.get("total-records").unwrap(), "30");
576 assert_eq!(summary.get("total-data-files").unwrap(), "2");
577 assert_eq!(summary.get("total-files-size").unwrap(), "300");
578 }
579
580 #[tokio::test]
581 async fn test_commit_rejects_encrypted_table() {
582 let file = File::open(format!(
583 "{}/testdata/table_metadata/{}",
584 env!("CARGO_MANIFEST_DIR"),
585 "TableMetadataV3ValidEncryption.json"
586 ))
587 .unwrap();
588 let reader = BufReader::new(file);
589 let resp = serde_json::from_reader::<_, TableMetadata>(reader).unwrap();
590
591 let kms: Arc<dyn KeyManagementClient> = {
592 let k = MemoryKeyManagementClient::new();
593 k.add_master_key_bytes(
594 "master-1",
595 SensitiveBytes::new([
596 0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0a, 0x0b, 0x0c,
597 0x0d, 0x0e, 0x0f,
598 ]),
599 )
600 .unwrap();
601 Arc::new(k)
602 };
603
604 let table = Table::builder()
605 .metadata(resp)
606 .metadata_location("s3://bucket/test/location/metadata/v1.json")
607 .identifier(TableIdent::from_strs(["ns1", "test1"]).unwrap())
608 .file_io(FileIO::new_with_memory())
609 .kms_client(kms)
610 .runtime(crate::test_utils::test_runtime())
611 .build()
612 .unwrap();
613
614 let tx = Transaction::new(&table);
615 let tx = tx
616 .update_table_properties()
617 .set("test.key".to_string(), "test.value".to_string())
618 .apply(tx)
619 .unwrap();
620
621 let mock_catalog = MockCatalog::new();
622 let result = tx.commit(&mock_catalog).await;
623
624 assert!(result.is_err());
625 let err = result.unwrap_err();
626 assert_eq!(err.kind(), ErrorKind::FeatureUnsupported);
627 assert!(
628 err.message()
629 .contains("encrypted writes are not yet supported"),
630 "unexpected error message: {}",
631 err.message()
632 );
633 }
634}
635
636#[cfg(test)]
637mod test_row_lineage {
638 use crate::memory::tests::new_memory_catalog;
639 use crate::spec::{
640 DataContentType, DataFile, DataFileBuilder, DataFileFormat, Literal, Struct,
641 };
642 use crate::transaction::tests::make_v3_minimal_table_in_catalog;
643 use crate::transaction::{ApplyTransactionAction, Transaction};
644
645 #[tokio::test]
646 async fn test_fast_append_with_row_lineage() {
647 fn file_with_rows(record_count: u64) -> DataFile {
649 DataFileBuilder::default()
650 .content(DataContentType::Data)
651 .file_path(format!("test/{record_count}.parquet"))
652 .file_format(DataFileFormat::Parquet)
653 .file_size_in_bytes(100)
654 .record_count(record_count)
655 .partition(Struct::from_iter([Some(Literal::long(0))]))
656 .partition_spec_id(0)
657 .build()
658 .unwrap()
659 }
660 let catalog = new_memory_catalog().await;
661
662 let table = make_v3_minimal_table_in_catalog(&catalog).await;
663
664 assert_eq!(table.metadata().next_row_id(), 0);
666
667 let tx = Transaction::new(&table);
669 let data_file_30 = file_with_rows(30);
670 let action = tx.fast_append().add_data_files(vec![data_file_30]);
671 let tx = action.apply(tx).unwrap();
672 let table = tx.commit(&catalog).await.unwrap();
673
674 let snapshot = table.metadata().current_snapshot().unwrap();
676 assert_eq!(snapshot.first_row_id(), Some(0));
677 assert_eq!(table.metadata().next_row_id(), 30);
678
679 let snapshot = table.metadata().current_snapshot().unwrap();
681 let manifest_list = table.manifest_list_reader(snapshot).load().await.unwrap();
682
683 assert_eq!(manifest_list.entries().len(), 1);
684 let manifest_file = &manifest_list.entries()[0];
685 assert_eq!(manifest_file.first_row_id, Some(0));
686
687 let tx = Transaction::new(&table);
689 let data_file_17 = file_with_rows(17);
690 let data_file_11 = file_with_rows(11);
691 let action = tx
692 .fast_append()
693 .add_data_files(vec![data_file_17, data_file_11]);
694 let tx = action.apply(tx).unwrap();
695 let table = tx.commit(&catalog).await.unwrap();
696
697 let snapshot = table.metadata().current_snapshot().unwrap();
699 assert_eq!(snapshot.first_row_id(), Some(30));
700 assert_eq!(table.metadata().next_row_id(), 30 + 17 + 11);
701
702 let manifest_list = table.manifest_list_reader(snapshot).load().await.unwrap();
704 assert_eq!(manifest_list.entries().len(), 2);
705 let manifest_file = &manifest_list.entries()[1];
706 assert_eq!(manifest_file.first_row_id, Some(30));
707 }
708}