1mod action;
54
55pub use action::*;
56mod append;
57mod snapshot;
58mod sort_order;
59mod update_location;
60mod update_properties;
61mod update_schema;
62mod update_statistics;
63mod upgrade_format_version;
64
65use std::sync::Arc;
66use std::time::Duration;
67
68use backon::{BackoffBuilder, ExponentialBackoff, ExponentialBuilder, RetryableWithContext};
69pub use update_schema::AddColumn;
70
71use crate::error::Result;
72use crate::spec::TableProperties;
73use crate::table::Table;
74use crate::transaction::action::BoxedTransactionAction;
75use crate::transaction::append::FastAppendAction;
76use crate::transaction::sort_order::ReplaceSortOrderAction;
77use crate::transaction::update_location::UpdateLocationAction;
78use crate::transaction::update_properties::UpdatePropertiesAction;
79use crate::transaction::update_schema::UpdateSchemaAction;
80use crate::transaction::update_statistics::UpdateStatisticsAction;
81use crate::transaction::upgrade_format_version::UpgradeFormatVersionAction;
82use crate::{Catalog, TableCommit, TableRequirement, TableUpdate};
83
84#[derive(Clone)]
86pub struct Transaction {
87 table: Table,
88 actions: Vec<BoxedTransactionAction>,
89}
90
91impl Transaction {
92 pub fn new(table: &Table) -> Self {
94 Self {
95 table: table.clone(),
96 actions: vec![],
97 }
98 }
99
100 fn update_table_metadata(table: Table, updates: &[TableUpdate]) -> Result<Table> {
101 let mut metadata_builder = table.metadata().clone().into_builder(None);
102 for update in updates {
103 metadata_builder = update.clone().apply(metadata_builder)?;
104 }
105
106 Ok(table.with_metadata(Arc::new(metadata_builder.build()?.metadata)))
107 }
108
109 fn apply(
112 table: Table,
113 mut action_commit: ActionCommit,
114 existing_updates: &mut Vec<TableUpdate>,
115 existing_requirements: &mut Vec<TableRequirement>,
116 ) -> Result<Table> {
117 let updates = action_commit.take_updates();
118 let requirements = action_commit.take_requirements();
119
120 for requirement in &requirements {
121 requirement.check(Some(table.metadata()))?;
122 }
123
124 let updated_table = Self::update_table_metadata(table, &updates)?;
125
126 existing_updates.extend(updates);
127 existing_requirements.extend(requirements);
128
129 Ok(updated_table)
130 }
131
132 pub fn upgrade_table_version(&self) -> UpgradeFormatVersionAction {
134 UpgradeFormatVersionAction::new()
135 }
136
137 pub fn update_table_properties(&self) -> UpdatePropertiesAction {
139 UpdatePropertiesAction::new()
140 }
141
142 pub fn update_schema(&self) -> UpdateSchemaAction {
144 UpdateSchemaAction::new()
145 }
146
147 pub fn fast_append(&self) -> FastAppendAction {
149 FastAppendAction::new()
150 }
151
152 pub fn replace_sort_order(&self) -> ReplaceSortOrderAction {
154 ReplaceSortOrderAction::new()
155 }
156
157 pub fn update_location(&self) -> UpdateLocationAction {
159 UpdateLocationAction::new()
160 }
161
162 pub fn update_statistics(&self) -> UpdateStatisticsAction {
164 UpdateStatisticsAction::new()
165 }
166
167 pub async fn commit(self, catalog: &dyn Catalog) -> Result<Table> {
169 if self.actions.is_empty() {
170 return Ok(self.table);
172 }
173
174 let table_props = self.table.metadata().table_properties()?;
175
176 let backoff = Self::build_backoff(table_props)?;
177 let tx = self;
178
179 (|mut tx: Transaction| async {
180 let result = tx.do_commit(catalog).await;
181 (tx, result)
182 })
183 .retry(backoff)
184 .sleep(tokio::time::sleep)
185 .context(tx)
186 .when(|e| e.retryable())
187 .await
188 .1
189 }
190
191 fn build_backoff(props: TableProperties) -> Result<ExponentialBackoff> {
192 Ok(ExponentialBuilder::new()
193 .with_min_delay(Duration::from_millis(props.commit_min_retry_wait_ms))
194 .with_max_delay(Duration::from_millis(props.commit_max_retry_wait_ms))
195 .with_total_delay(Some(Duration::from_millis(
196 props.commit_total_retry_timeout_ms,
197 )))
198 .with_max_times(props.commit_num_retries)
199 .with_factor(2.0)
200 .build())
201 }
202
203 async fn do_commit(&mut self, catalog: &dyn Catalog) -> Result<Table> {
204 let refreshed = catalog.load_table(self.table.identifier()).await?;
205
206 if self.table.metadata() != refreshed.metadata()
207 || self.table.metadata_location() != refreshed.metadata_location()
208 {
209 self.table = refreshed.clone();
211 }
212
213 let mut current_table = self.table.clone();
214 let mut existing_updates: Vec<TableUpdate> = vec![];
215 let mut existing_requirements: Vec<TableRequirement> = vec![];
216
217 for action in &self.actions {
218 let action_commit = Arc::clone(action).commit(¤t_table).await?;
219 current_table = Self::apply(
221 current_table,
222 action_commit,
223 &mut existing_updates,
224 &mut existing_requirements,
225 )?;
226 }
227
228 let table_commit = TableCommit::builder()
229 .ident(self.table.identifier().to_owned())
230 .updates(existing_updates)
231 .requirements(existing_requirements)
232 .build();
233
234 catalog.update_table(table_commit).await
235 }
236}
237
238#[cfg(test)]
239mod tests {
240 use std::collections::HashMap;
241 use std::fs::File;
242 use std::io::BufReader;
243 use std::sync::Arc;
244 use std::sync::atomic::{AtomicU32, Ordering};
245
246 use crate::catalog::MockCatalog;
247 use crate::io::FileIO;
248 use crate::memory::tests::new_memory_catalog;
249 use crate::spec::{
250 DataContentType, DataFileBuilder, DataFileFormat, Literal, Struct, TableMetadata,
251 };
252 use crate::table::Table;
253 use crate::test_utils::test_runtime;
254 use crate::transaction::{ApplyTransactionAction, Transaction};
255 use crate::{Catalog, Error, ErrorKind, TableCreation, TableIdent};
256
257 pub fn make_v1_table() -> Table {
258 let file = File::open(format!(
259 "{}/testdata/table_metadata/{}",
260 env!("CARGO_MANIFEST_DIR"),
261 "TableMetadataV1Valid.json"
262 ))
263 .unwrap();
264 let reader = BufReader::new(file);
265 let resp = serde_json::from_reader::<_, TableMetadata>(reader).unwrap();
266
267 Table::builder()
268 .metadata(resp)
269 .metadata_location("s3://bucket/test/location/metadata/v1.json")
270 .identifier(TableIdent::from_strs(["ns1", "test1"]).unwrap())
271 .file_io(FileIO::new_with_memory())
272 .runtime(test_runtime())
273 .build()
274 .unwrap()
275 }
276
277 pub fn make_v2_table() -> Table {
278 let file = File::open(format!(
279 "{}/testdata/table_metadata/{}",
280 env!("CARGO_MANIFEST_DIR"),
281 "TableMetadataV2Valid.json"
282 ))
283 .unwrap();
284 let reader = BufReader::new(file);
285 let resp = serde_json::from_reader::<_, TableMetadata>(reader).unwrap();
286
287 Table::builder()
288 .metadata(resp)
289 .metadata_location("s3://bucket/test/location/metadata/v1.json")
290 .identifier(TableIdent::from_strs(["ns1", "test1"]).unwrap())
291 .file_io(FileIO::new_with_memory())
292 .runtime(test_runtime())
293 .build()
294 .unwrap()
295 }
296
297 pub fn make_v2_minimal_table() -> Table {
298 let file = File::open(format!(
299 "{}/testdata/table_metadata/{}",
300 env!("CARGO_MANIFEST_DIR"),
301 "TableMetadataV2ValidMinimal.json"
302 ))
303 .unwrap();
304 let reader = BufReader::new(file);
305 let resp = serde_json::from_reader::<_, TableMetadata>(reader).unwrap();
306
307 Table::builder()
308 .metadata(resp)
309 .metadata_location("s3://bucket/test/location/metadata/v1.json")
310 .identifier(TableIdent::from_strs(["ns1", "test1"]).unwrap())
311 .file_io(FileIO::new_with_memory())
312 .runtime(test_runtime())
313 .build()
314 .unwrap()
315 }
316
317 pub(crate) async fn make_v3_minimal_table_in_catalog(catalog: &impl Catalog) -> Table {
318 let table_ident =
319 TableIdent::from_strs([format!("ns1-{}", uuid::Uuid::new_v4()), "test1".to_string()])
320 .unwrap();
321
322 catalog
323 .create_namespace(table_ident.namespace(), HashMap::new())
324 .await
325 .unwrap();
326
327 let file = File::open(format!(
328 "{}/testdata/table_metadata/{}",
329 env!("CARGO_MANIFEST_DIR"),
330 "TableMetadataV3ValidMinimal.json"
331 ))
332 .unwrap();
333 let reader = BufReader::new(file);
334 let base_metadata = serde_json::from_reader::<_, TableMetadata>(reader).unwrap();
335
336 let table_creation = TableCreation::builder()
337 .schema((**base_metadata.current_schema()).clone())
338 .partition_spec((**base_metadata.default_partition_spec()).clone())
339 .sort_order((**base_metadata.default_sort_order()).clone())
340 .name(table_ident.name().to_string())
341 .format_version(crate::spec::FormatVersion::V3)
342 .build();
343
344 catalog
345 .create_table(table_ident.namespace(), table_creation)
346 .await
347 .unwrap()
348 }
349
350 pub(super) fn setup_test_table(num_retries: &str) -> Table {
352 let table = make_v2_table();
353
354 let mut props = HashMap::new();
356 props.insert("commit.retry.min-wait-ms".to_string(), "10".to_string());
357 props.insert("commit.retry.max-wait-ms".to_string(), "100".to_string());
358 props.insert(
359 "commit.retry.total-timeout-ms".to_string(),
360 "1000".to_string(),
361 );
362 props.insert(
363 "commit.retry.num-retries".to_string(),
364 num_retries.to_string(),
365 );
366
367 let metadata = table
369 .metadata()
370 .clone()
371 .into_builder(None)
372 .set_properties(props)
373 .unwrap()
374 .build()
375 .unwrap()
376 .metadata;
377
378 table.with_metadata(Arc::new(metadata))
379 }
380
381 fn create_test_transaction(table: &Table) -> Transaction {
383 let tx = Transaction::new(table);
384 tx.update_table_properties()
385 .set("test.key".to_string(), "test.value".to_string())
386 .apply(tx)
387 .unwrap()
388 }
389
390 fn setup_mock_catalog_with_retryable_errors(
392 success_after_attempts: Option<u32>,
393 expected_calls: usize,
394 ) -> MockCatalog {
395 let mut mock_catalog = MockCatalog::new();
396
397 mock_catalog
398 .expect_load_table()
399 .returning_st(|_| Box::pin(async move { Ok(make_v2_table()) }));
400
401 let attempts = AtomicU32::new(0);
402 mock_catalog
403 .expect_update_table()
404 .times(expected_calls)
405 .returning_st(move |_| {
406 if let Some(success_after_attempts) = success_after_attempts {
407 attempts.fetch_add(1, Ordering::SeqCst);
408 if attempts.load(Ordering::SeqCst) <= success_after_attempts {
409 Box::pin(async move {
410 Err(
411 Error::new(ErrorKind::CatalogCommitConflicts, "Commit conflict")
412 .with_retryable(true),
413 )
414 })
415 } else {
416 Box::pin(async move { Ok(make_v2_table()) })
417 }
418 } else {
419 Box::pin(async move {
421 Err(
422 Error::new(ErrorKind::CatalogCommitConflicts, "Commit conflict")
423 .with_retryable(true),
424 )
425 })
426 }
427 });
428
429 mock_catalog
430 }
431
432 fn setup_mock_catalog_with_non_retryable_error() -> MockCatalog {
434 let mut mock_catalog = MockCatalog::new();
435
436 mock_catalog
437 .expect_load_table()
438 .returning_st(|_| Box::pin(async move { Ok(make_v2_table()) }));
439
440 mock_catalog
441 .expect_update_table()
442 .times(1) .returning_st(move |_| {
444 Box::pin(async move {
445 Err(Error::new(ErrorKind::Unexpected, "Non-retryable error")
446 .with_retryable(false))
447 })
448 });
449
450 mock_catalog
451 }
452
453 #[tokio::test]
454 async fn test_commit_retryable_error() {
455 let table = setup_test_table("3");
457
458 let tx = create_test_transaction(&table);
460
461 let mock_catalog = setup_mock_catalog_with_retryable_errors(Some(2), 3);
463
464 let result = tx.commit(&mock_catalog).await;
466
467 assert!(result.is_ok(), "Transaction should eventually succeed");
469 }
470
471 #[tokio::test]
472 async fn test_commit_non_retryable_error() {
473 let table = setup_test_table("3");
475
476 let tx = create_test_transaction(&table);
478
479 let mock_catalog = setup_mock_catalog_with_non_retryable_error();
481
482 let result = tx.commit(&mock_catalog).await;
484
485 assert!(result.is_err(), "Transaction should fail immediately");
487 if let Err(err) = result {
488 assert_eq!(err.kind(), ErrorKind::Unexpected);
489 assert_eq!(err.message(), "Non-retryable error");
490 assert!(!err.retryable(), "Error should not be retryable");
491 }
492 }
493
494 #[tokio::test]
495 async fn test_commit_max_retries_exceeded() {
496 let table = setup_test_table("2");
498
499 let tx = create_test_transaction(&table);
501
502 let mock_catalog = setup_mock_catalog_with_retryable_errors(None, 3); let result = tx.commit(&mock_catalog).await;
507
508 assert!(result.is_err(), "Transaction should fail after max retries");
510 if let Err(err) = result {
511 assert_eq!(err.kind(), ErrorKind::CatalogCommitConflicts);
512 assert_eq!(err.message(), "Commit conflict");
513 assert!(err.retryable(), "Error should be retryable");
514 }
515 }
516
517 #[tokio::test]
518 async fn test_transaction_snapshot_summary() {
519 let catalog = new_memory_catalog().await;
520 let table = make_v3_minimal_table_in_catalog(&catalog).await;
521
522 let mut file_seq = 0u32;
523 let mut append_file = |table: &crate::table::Table, record_count: u64, file_size: u64| {
524 file_seq += 1;
525 let file = DataFileBuilder::default()
526 .content(DataContentType::Data)
527 .file_path(format!("test/{file_seq}.parquet"))
528 .file_format(DataFileFormat::Parquet)
529 .file_size_in_bytes(file_size)
530 .record_count(record_count)
531 .partition(Struct::from_iter([Some(Literal::long(1))]))
532 .partition_spec_id(0)
533 .build()
534 .unwrap();
535 let tx = Transaction::new(table);
536 tx.fast_append()
537 .add_data_files(vec![file])
538 .apply(tx)
539 .unwrap()
540 };
541
542 let table = append_file(&table, 10, 100)
543 .commit(&catalog)
544 .await
545 .unwrap();
546 let table = append_file(&table, 20, 200)
547 .commit(&catalog)
548 .await
549 .unwrap();
550
551 let summary = &table
552 .metadata()
553 .current_snapshot()
554 .unwrap()
555 .summary()
556 .additional_properties;
557
558 assert_eq!(summary.get("total-records").unwrap(), "30");
559 assert_eq!(summary.get("total-data-files").unwrap(), "2");
560 assert_eq!(summary.get("total-files-size").unwrap(), "300");
561 }
562}
563
564#[cfg(test)]
565mod test_row_lineage {
566 use crate::memory::tests::new_memory_catalog;
567 use crate::spec::{
568 DataContentType, DataFile, DataFileBuilder, DataFileFormat, Literal, Struct,
569 };
570 use crate::transaction::tests::make_v3_minimal_table_in_catalog;
571 use crate::transaction::{ApplyTransactionAction, Transaction};
572
573 #[tokio::test]
574 async fn test_fast_append_with_row_lineage() {
575 fn file_with_rows(record_count: u64) -> DataFile {
577 DataFileBuilder::default()
578 .content(DataContentType::Data)
579 .file_path(format!("test/{record_count}.parquet"))
580 .file_format(DataFileFormat::Parquet)
581 .file_size_in_bytes(100)
582 .record_count(record_count)
583 .partition(Struct::from_iter([Some(Literal::long(0))]))
584 .partition_spec_id(0)
585 .build()
586 .unwrap()
587 }
588 let catalog = new_memory_catalog().await;
589
590 let table = make_v3_minimal_table_in_catalog(&catalog).await;
591
592 assert_eq!(table.metadata().next_row_id(), 0);
594
595 let tx = Transaction::new(&table);
597 let data_file_30 = file_with_rows(30);
598 let action = tx.fast_append().add_data_files(vec![data_file_30]);
599 let tx = action.apply(tx).unwrap();
600 let table = tx.commit(&catalog).await.unwrap();
601
602 let snapshot = table.metadata().current_snapshot().unwrap();
604 assert_eq!(snapshot.first_row_id(), Some(0));
605 assert_eq!(table.metadata().next_row_id(), 30);
606
607 let manifest_list = table
609 .metadata()
610 .current_snapshot()
611 .unwrap()
612 .load_manifest_list(table.file_io(), table.metadata())
613 .await
614 .unwrap();
615
616 assert_eq!(manifest_list.entries().len(), 1);
617 let manifest_file = &manifest_list.entries()[0];
618 assert_eq!(manifest_file.first_row_id, Some(0));
619
620 let tx = Transaction::new(&table);
622 let data_file_17 = file_with_rows(17);
623 let data_file_11 = file_with_rows(11);
624 let action = tx
625 .fast_append()
626 .add_data_files(vec![data_file_17, data_file_11]);
627 let tx = action.apply(tx).unwrap();
628 let table = tx.commit(&catalog).await.unwrap();
629
630 let snapshot = table.metadata().current_snapshot().unwrap();
632 assert_eq!(snapshot.first_row_id(), Some(30));
633 assert_eq!(table.metadata().next_row_id(), 30 + 17 + 11);
634
635 let manifest_list = table
637 .metadata()
638 .current_snapshot()
639 .unwrap()
640 .load_manifest_list(table.file_io(), table.metadata())
641 .await
642 .unwrap();
643 assert_eq!(manifest_list.entries().len(), 2);
644 let manifest_file = &manifest_list.entries()[1];
645 assert_eq!(manifest_file.first_row_id, Some(30));
646 }
647}