iceberg/transaction/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! This module contains transaction api.
19//!
20//! The transaction API enables changes to be made to an existing table.
21//!
22//! Note that this may also have side effects, such as producing new manifest
23//! files.
24//!
25//! Below is a basic example using the "fast-append" action:
26//!
27//! ```ignore
28//! use iceberg::transaction::{ApplyTransactionAction, Transaction};
29//! use iceberg::Catalog;
30//!
31//! // Create a transaction.
32//! let tx = Transaction::new(my_table);
33//!
34//! // Create a `FastAppendAction` which will not rewrite or append
35//! // to existing metadata. This will create a new manifest.
36//! let action = tx.fast_append().add_data_files(my_data_files);
37//!
38//! // Apply the fast-append action to the given transaction, returning
39//! // the newly updated `Transaction`.
40//! let tx = action.apply(tx).unwrap();
41//!
42//!
43//! // End the transaction by committing to an `iceberg::Catalog`
44//! // implementation. This will cause a table update to occur.
45//! let table = tx
46//!     .commit(&some_catalog_impl)
47//!     .await
48//!     .unwrap();
49//! ```
50
51/// The `ApplyTransactionAction` trait provides an `apply` method
52/// that allows users to apply a transaction action to a `Transaction`.
53mod action;
54
55pub use action::*;
56mod append;
57mod snapshot;
58mod sort_order;
59mod update_location;
60mod update_properties;
61mod update_statistics;
62mod upgrade_format_version;
63
64use std::sync::Arc;
65use std::time::Duration;
66
67use backon::{BackoffBuilder, ExponentialBackoff, ExponentialBuilder, RetryableWithContext};
68
69use crate::error::Result;
70use crate::spec::TableProperties;
71use crate::table::Table;
72use crate::transaction::action::BoxedTransactionAction;
73use crate::transaction::append::FastAppendAction;
74use crate::transaction::sort_order::ReplaceSortOrderAction;
75use crate::transaction::update_location::UpdateLocationAction;
76use crate::transaction::update_properties::UpdatePropertiesAction;
77use crate::transaction::update_statistics::UpdateStatisticsAction;
78use crate::transaction::upgrade_format_version::UpgradeFormatVersionAction;
79use crate::{Catalog, TableCommit, TableRequirement, TableUpdate};
80
81/// Table transaction.
82#[derive(Clone)]
83pub struct Transaction {
84    table: Table,
85    actions: Vec<BoxedTransactionAction>,
86}
87
88impl Transaction {
89    /// Creates a new transaction.
90    pub fn new(table: &Table) -> Self {
91        Self {
92            table: table.clone(),
93            actions: vec![],
94        }
95    }
96
97    fn update_table_metadata(table: Table, updates: &[TableUpdate]) -> Result<Table> {
98        let mut metadata_builder = table.metadata().clone().into_builder(None);
99        for update in updates {
100            metadata_builder = update.clone().apply(metadata_builder)?;
101        }
102
103        Ok(table.with_metadata(Arc::new(metadata_builder.build()?.metadata)))
104    }
105
106    /// Applies an [`ActionCommit`] to the given [`Table`], returning a new [`Table`] with updated metadata.
107    /// Also appends any derived [`TableUpdate`]s and [`TableRequirement`]s to the provided vectors.
108    fn apply(
109        table: Table,
110        mut action_commit: ActionCommit,
111        existing_updates: &mut Vec<TableUpdate>,
112        existing_requirements: &mut Vec<TableRequirement>,
113    ) -> Result<Table> {
114        let updates = action_commit.take_updates();
115        let requirements = action_commit.take_requirements();
116
117        for requirement in &requirements {
118            requirement.check(Some(table.metadata()))?;
119        }
120
121        let updated_table = Self::update_table_metadata(table, &updates)?;
122
123        existing_updates.extend(updates);
124        existing_requirements.extend(requirements);
125
126        Ok(updated_table)
127    }
128
129    /// Sets table to a new version.
130    pub fn upgrade_table_version(&self) -> UpgradeFormatVersionAction {
131        UpgradeFormatVersionAction::new()
132    }
133
134    /// Update table's property.
135    pub fn update_table_properties(&self) -> UpdatePropertiesAction {
136        UpdatePropertiesAction::new()
137    }
138
139    /// Creates a fast append action.
140    pub fn fast_append(&self) -> FastAppendAction {
141        FastAppendAction::new()
142    }
143
144    /// Creates replace sort order action.
145    pub fn replace_sort_order(&self) -> ReplaceSortOrderAction {
146        ReplaceSortOrderAction::new()
147    }
148
149    /// Set the location of table
150    pub fn update_location(&self) -> UpdateLocationAction {
151        UpdateLocationAction::new()
152    }
153
154    /// Update the statistics of table
155    pub fn update_statistics(&self) -> UpdateStatisticsAction {
156        UpdateStatisticsAction::new()
157    }
158
159    /// Commit transaction.
160    pub async fn commit(self, catalog: &dyn Catalog) -> Result<Table> {
161        if self.actions.is_empty() {
162            // nothing to commit
163            return Ok(self.table);
164        }
165
166        let table_props = self.table.metadata().table_properties()?;
167
168        let backoff = Self::build_backoff(table_props)?;
169        let tx = self;
170
171        (|mut tx: Transaction| async {
172            let result = tx.do_commit(catalog).await;
173            (tx, result)
174        })
175        .retry(backoff)
176        .sleep(tokio::time::sleep)
177        .context(tx)
178        .when(|e| e.retryable())
179        .await
180        .1
181    }
182
183    fn build_backoff(props: TableProperties) -> Result<ExponentialBackoff> {
184        Ok(ExponentialBuilder::new()
185            .with_min_delay(Duration::from_millis(props.commit_min_retry_wait_ms))
186            .with_max_delay(Duration::from_millis(props.commit_max_retry_wait_ms))
187            .with_total_delay(Some(Duration::from_millis(
188                props.commit_total_retry_timeout_ms,
189            )))
190            .with_max_times(props.commit_num_retries)
191            .with_factor(2.0)
192            .build())
193    }
194
195    async fn do_commit(&mut self, catalog: &dyn Catalog) -> Result<Table> {
196        let refreshed = catalog.load_table(self.table.identifier()).await?;
197
198        if self.table.metadata() != refreshed.metadata()
199            || self.table.metadata_location() != refreshed.metadata_location()
200        {
201            // current base is stale, use refreshed as base and re-apply transaction actions
202            self.table = refreshed.clone();
203        }
204
205        let mut current_table = self.table.clone();
206        let mut existing_updates: Vec<TableUpdate> = vec![];
207        let mut existing_requirements: Vec<TableRequirement> = vec![];
208
209        for action in &self.actions {
210            let action_commit = Arc::clone(action).commit(&current_table).await?;
211            // apply action commit to current_table
212            current_table = Self::apply(
213                current_table,
214                action_commit,
215                &mut existing_updates,
216                &mut existing_requirements,
217            )?;
218        }
219
220        let table_commit = TableCommit::builder()
221            .ident(self.table.identifier().to_owned())
222            .updates(existing_updates)
223            .requirements(existing_requirements)
224            .build();
225
226        catalog.update_table(table_commit).await
227    }
228}
229
230#[cfg(test)]
231mod tests {
232    use std::collections::HashMap;
233    use std::fs::File;
234    use std::io::BufReader;
235    use std::sync::Arc;
236    use std::sync::atomic::{AtomicU32, Ordering};
237
238    use crate::catalog::MockCatalog;
239    use crate::io::FileIOBuilder;
240    use crate::spec::TableMetadata;
241    use crate::table::Table;
242    use crate::transaction::{ApplyTransactionAction, Transaction};
243    use crate::{Catalog, Error, ErrorKind, TableCreation, TableIdent};
244
245    pub fn make_v1_table() -> Table {
246        let file = File::open(format!(
247            "{}/testdata/table_metadata/{}",
248            env!("CARGO_MANIFEST_DIR"),
249            "TableMetadataV1Valid.json"
250        ))
251        .unwrap();
252        let reader = BufReader::new(file);
253        let resp = serde_json::from_reader::<_, TableMetadata>(reader).unwrap();
254
255        Table::builder()
256            .metadata(resp)
257            .metadata_location("s3://bucket/test/location/metadata/v1.json".to_string())
258            .identifier(TableIdent::from_strs(["ns1", "test1"]).unwrap())
259            .file_io(FileIOBuilder::new("memory").build().unwrap())
260            .build()
261            .unwrap()
262    }
263
264    pub fn make_v2_table() -> Table {
265        let file = File::open(format!(
266            "{}/testdata/table_metadata/{}",
267            env!("CARGO_MANIFEST_DIR"),
268            "TableMetadataV2Valid.json"
269        ))
270        .unwrap();
271        let reader = BufReader::new(file);
272        let resp = serde_json::from_reader::<_, TableMetadata>(reader).unwrap();
273
274        Table::builder()
275            .metadata(resp)
276            .metadata_location("s3://bucket/test/location/metadata/v1.json".to_string())
277            .identifier(TableIdent::from_strs(["ns1", "test1"]).unwrap())
278            .file_io(FileIOBuilder::new("memory").build().unwrap())
279            .build()
280            .unwrap()
281    }
282
283    pub fn make_v2_minimal_table() -> Table {
284        let file = File::open(format!(
285            "{}/testdata/table_metadata/{}",
286            env!("CARGO_MANIFEST_DIR"),
287            "TableMetadataV2ValidMinimal.json"
288        ))
289        .unwrap();
290        let reader = BufReader::new(file);
291        let resp = serde_json::from_reader::<_, TableMetadata>(reader).unwrap();
292
293        Table::builder()
294            .metadata(resp)
295            .metadata_location("s3://bucket/test/location/metadata/v1.json".to_string())
296            .identifier(TableIdent::from_strs(["ns1", "test1"]).unwrap())
297            .file_io(FileIOBuilder::new("memory").build().unwrap())
298            .build()
299            .unwrap()
300    }
301
302    pub(crate) async fn make_v3_minimal_table_in_catalog(catalog: &impl Catalog) -> Table {
303        let table_ident =
304            TableIdent::from_strs([format!("ns1-{}", uuid::Uuid::new_v4()), "test1".to_string()])
305                .unwrap();
306
307        catalog
308            .create_namespace(table_ident.namespace(), HashMap::new())
309            .await
310            .unwrap();
311
312        let file = File::open(format!(
313            "{}/testdata/table_metadata/{}",
314            env!("CARGO_MANIFEST_DIR"),
315            "TableMetadataV3ValidMinimal.json"
316        ))
317        .unwrap();
318        let reader = BufReader::new(file);
319        let base_metadata = serde_json::from_reader::<_, TableMetadata>(reader).unwrap();
320
321        let table_creation = TableCreation::builder()
322            .schema((**base_metadata.current_schema()).clone())
323            .partition_spec((**base_metadata.default_partition_spec()).clone())
324            .sort_order((**base_metadata.default_sort_order()).clone())
325            .name(table_ident.name().to_string())
326            .format_version(crate::spec::FormatVersion::V3)
327            .build();
328
329        catalog
330            .create_table(table_ident.namespace(), table_creation)
331            .await
332            .unwrap()
333    }
334
335    /// Helper function to create a test table with retry properties
336    pub(super) fn setup_test_table(num_retries: &str) -> Table {
337        let table = make_v2_table();
338
339        // Set retry properties
340        let mut props = HashMap::new();
341        props.insert("commit.retry.min-wait-ms".to_string(), "10".to_string());
342        props.insert("commit.retry.max-wait-ms".to_string(), "100".to_string());
343        props.insert(
344            "commit.retry.total-timeout-ms".to_string(),
345            "1000".to_string(),
346        );
347        props.insert(
348            "commit.retry.num-retries".to_string(),
349            num_retries.to_string(),
350        );
351
352        // Update table properties
353        let metadata = table
354            .metadata()
355            .clone()
356            .into_builder(None)
357            .set_properties(props)
358            .unwrap()
359            .build()
360            .unwrap()
361            .metadata;
362
363        table.with_metadata(Arc::new(metadata))
364    }
365
366    /// Helper function to create a transaction with a simple update action
367    fn create_test_transaction(table: &Table) -> Transaction {
368        let tx = Transaction::new(table);
369        tx.update_table_properties()
370            .set("test.key".to_string(), "test.value".to_string())
371            .apply(tx)
372            .unwrap()
373    }
374
375    /// Helper function to set up a mock catalog with retryable errors
376    fn setup_mock_catalog_with_retryable_errors(
377        success_after_attempts: Option<u32>,
378        expected_calls: usize,
379    ) -> MockCatalog {
380        let mut mock_catalog = MockCatalog::new();
381
382        mock_catalog
383            .expect_load_table()
384            .returning_st(|_| Box::pin(async move { Ok(make_v2_table()) }));
385
386        let attempts = AtomicU32::new(0);
387        mock_catalog
388            .expect_update_table()
389            .times(expected_calls)
390            .returning_st(move |_| {
391                if let Some(success_after_attempts) = success_after_attempts {
392                    attempts.fetch_add(1, Ordering::SeqCst);
393                    if attempts.load(Ordering::SeqCst) <= success_after_attempts {
394                        Box::pin(async move {
395                            Err(
396                                Error::new(ErrorKind::CatalogCommitConflicts, "Commit conflict")
397                                    .with_retryable(true),
398                            )
399                        })
400                    } else {
401                        Box::pin(async move { Ok(make_v2_table()) })
402                    }
403                } else {
404                    // Always fail with retryable error
405                    Box::pin(async move {
406                        Err(
407                            Error::new(ErrorKind::CatalogCommitConflicts, "Commit conflict")
408                                .with_retryable(true),
409                        )
410                    })
411                }
412            });
413
414        mock_catalog
415    }
416
417    /// Helper function to set up a mock catalog with non-retryable error
418    fn setup_mock_catalog_with_non_retryable_error() -> MockCatalog {
419        let mut mock_catalog = MockCatalog::new();
420
421        mock_catalog
422            .expect_load_table()
423            .returning_st(|_| Box::pin(async move { Ok(make_v2_table()) }));
424
425        mock_catalog
426            .expect_update_table()
427            .times(1) // Should only be called once since error is not retryable
428            .returning_st(move |_| {
429                Box::pin(async move {
430                    Err(Error::new(ErrorKind::Unexpected, "Non-retryable error")
431                        .with_retryable(false))
432                })
433            });
434
435        mock_catalog
436    }
437
438    #[tokio::test]
439    async fn test_commit_retryable_error() {
440        // Create a test table with retry properties
441        let table = setup_test_table("3");
442
443        // Create a transaction with a simple update action
444        let tx = create_test_transaction(&table);
445
446        // Create a mock catalog that fails twice then succeeds
447        let mock_catalog = setup_mock_catalog_with_retryable_errors(Some(2), 3);
448
449        // Commit the transaction
450        let result = tx.commit(&mock_catalog).await;
451
452        // Verify the result
453        assert!(result.is_ok(), "Transaction should eventually succeed");
454    }
455
456    #[tokio::test]
457    async fn test_commit_non_retryable_error() {
458        // Create a test table with retry properties
459        let table = setup_test_table("3");
460
461        // Create a transaction with a simple update action
462        let tx = create_test_transaction(&table);
463
464        // Create a mock catalog that fails with non-retryable error
465        let mock_catalog = setup_mock_catalog_with_non_retryable_error();
466
467        // Commit the transaction
468        let result = tx.commit(&mock_catalog).await;
469
470        // Verify the result
471        assert!(result.is_err(), "Transaction should fail immediately");
472        if let Err(err) = result {
473            assert_eq!(err.kind(), ErrorKind::Unexpected);
474            assert_eq!(err.message(), "Non-retryable error");
475            assert!(!err.retryable(), "Error should not be retryable");
476        }
477    }
478
479    #[tokio::test]
480    async fn test_commit_max_retries_exceeded() {
481        // Create a test table with retry properties (only allow 2 retries)
482        let table = setup_test_table("2");
483
484        // Create a transaction with a simple update action
485        let tx = create_test_transaction(&table);
486
487        // Create a mock catalog that always fails with retryable error
488        let mock_catalog = setup_mock_catalog_with_retryable_errors(None, 3); // Initial attempt + 2 retries = 3 total attempts
489
490        // Commit the transaction
491        let result = tx.commit(&mock_catalog).await;
492
493        // Verify the result
494        assert!(result.is_err(), "Transaction should fail after max retries");
495        if let Err(err) = result {
496            assert_eq!(err.kind(), ErrorKind::CatalogCommitConflicts);
497            assert_eq!(err.message(), "Commit conflict");
498            assert!(err.retryable(), "Error should be retryable");
499        }
500    }
501}
502
503#[cfg(test)]
504mod test_row_lineage {
505    use crate::memory::tests::new_memory_catalog;
506    use crate::spec::{
507        DataContentType, DataFile, DataFileBuilder, DataFileFormat, Literal, Struct,
508    };
509    use crate::transaction::tests::make_v3_minimal_table_in_catalog;
510    use crate::transaction::{ApplyTransactionAction, Transaction};
511
512    #[tokio::test]
513    async fn test_fast_append_with_row_lineage() {
514        // Helper function to create a data file with specified number of rows
515        fn file_with_rows(record_count: u64) -> DataFile {
516            DataFileBuilder::default()
517                .content(DataContentType::Data)
518                .file_path(format!("test/{record_count}.parquet"))
519                .file_format(DataFileFormat::Parquet)
520                .file_size_in_bytes(100)
521                .record_count(record_count)
522                .partition(Struct::from_iter([Some(Literal::long(0))]))
523                .partition_spec_id(0)
524                .build()
525                .unwrap()
526        }
527        let catalog = new_memory_catalog().await;
528
529        let table = make_v3_minimal_table_in_catalog(&catalog).await;
530
531        // Check initial state - next_row_id should be 0
532        assert_eq!(table.metadata().next_row_id(), 0);
533
534        // First fast append with 30 rows
535        let tx = Transaction::new(&table);
536        let data_file_30 = file_with_rows(30);
537        let action = tx.fast_append().add_data_files(vec![data_file_30]);
538        let tx = action.apply(tx).unwrap();
539        let table = tx.commit(&catalog).await.unwrap();
540
541        // Check snapshot and table state after first append
542        let snapshot = table.metadata().current_snapshot().unwrap();
543        assert_eq!(snapshot.first_row_id(), Some(0));
544        assert_eq!(table.metadata().next_row_id(), 30);
545
546        // Check written manifest for first_row_id
547        let manifest_list = table
548            .metadata()
549            .current_snapshot()
550            .unwrap()
551            .load_manifest_list(table.file_io(), table.metadata())
552            .await
553            .unwrap();
554
555        assert_eq!(manifest_list.entries().len(), 1);
556        let manifest_file = &manifest_list.entries()[0];
557        assert_eq!(manifest_file.first_row_id, Some(0));
558
559        // Second fast append with 17 and 11 rows
560        let tx = Transaction::new(&table);
561        let data_file_17 = file_with_rows(17);
562        let data_file_11 = file_with_rows(11);
563        let action = tx
564            .fast_append()
565            .add_data_files(vec![data_file_17, data_file_11]);
566        let tx = action.apply(tx).unwrap();
567        let table = tx.commit(&catalog).await.unwrap();
568
569        // Check snapshot and table state after second append
570        let snapshot = table.metadata().current_snapshot().unwrap();
571        assert_eq!(snapshot.first_row_id(), Some(30));
572        assert_eq!(table.metadata().next_row_id(), 30 + 17 + 11);
573
574        // Check written manifest for first_row_id
575        let manifest_list = table
576            .metadata()
577            .current_snapshot()
578            .unwrap()
579            .load_manifest_list(table.file_io(), table.metadata())
580            .await
581            .unwrap();
582        assert_eq!(manifest_list.entries().len(), 2);
583        let manifest_file = &manifest_list.entries()[1];
584        assert_eq!(manifest_file.first_row_id, Some(30));
585    }
586}