iceberg/transaction/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! This module contains transaction api.
19//!
20//! The transaction API enables changes to be made to an existing table.
21//!
22//! Note that this may also have side effects, such as producing new manifest
23//! files.
24//!
25//! Below is a basic example using the "fast-append" action:
26//!
27//! ```ignore
28//! use iceberg::transaction::{ApplyTransactionAction, Transaction};
29//! use iceberg::Catalog;
30//!
31//! // Create a transaction.
32//! let tx = Transaction::new(my_table);
33//!
34//! // Create a `FastAppendAction` which will not rewrite or append
35//! // to existing metadata. This will create a new manifest.
36//! let action = tx.fast_append().add_data_files(my_data_files);
37//!
38//! // Apply the fast-append action to the given transaction, returning
39//! // the newly updated `Transaction`.
40//! let tx = action.apply(tx).unwrap();
41//!
42//!
43//! // End the transaction by committing to an `iceberg::Catalog`
44//! // implementation. This will cause a table update to occur.
45//! let table = tx
46//!     .commit(&some_catalog_impl)
47//!     .await
48//!     .unwrap();
49//! ```
50
51/// The `ApplyTransactionAction` trait provides an `apply` method
52/// that allows users to apply a transaction action to a `Transaction`.
53mod action;
54
55pub use action::*;
56mod append;
57mod snapshot;
58mod sort_order;
59mod update_location;
60mod update_properties;
61mod update_statistics;
62mod upgrade_format_version;
63
64use std::sync::Arc;
65use std::time::Duration;
66
67use backon::{BackoffBuilder, ExponentialBackoff, ExponentialBuilder, RetryableWithContext};
68
69use crate::error::Result;
70use crate::spec::TableProperties;
71use crate::table::Table;
72use crate::transaction::action::BoxedTransactionAction;
73use crate::transaction::append::FastAppendAction;
74use crate::transaction::sort_order::ReplaceSortOrderAction;
75use crate::transaction::update_location::UpdateLocationAction;
76use crate::transaction::update_properties::UpdatePropertiesAction;
77use crate::transaction::update_statistics::UpdateStatisticsAction;
78use crate::transaction::upgrade_format_version::UpgradeFormatVersionAction;
79use crate::{Catalog, Error, ErrorKind, TableCommit, TableRequirement, TableUpdate};
80
81/// Table transaction.
82#[derive(Clone)]
83pub struct Transaction {
84    table: Table,
85    actions: Vec<BoxedTransactionAction>,
86}
87
88impl Transaction {
89    /// Creates a new transaction.
90    pub fn new(table: &Table) -> Self {
91        Self {
92            table: table.clone(),
93            actions: vec![],
94        }
95    }
96
97    fn update_table_metadata(table: Table, updates: &[TableUpdate]) -> Result<Table> {
98        let mut metadata_builder = table.metadata().clone().into_builder(None);
99        for update in updates {
100            metadata_builder = update.clone().apply(metadata_builder)?;
101        }
102
103        Ok(table.with_metadata(Arc::new(metadata_builder.build()?.metadata)))
104    }
105
106    /// Applies an [`ActionCommit`] to the given [`Table`], returning a new [`Table`] with updated metadata.
107    /// Also appends any derived [`TableUpdate`]s and [`TableRequirement`]s to the provided vectors.
108    fn apply(
109        table: Table,
110        mut action_commit: ActionCommit,
111        existing_updates: &mut Vec<TableUpdate>,
112        existing_requirements: &mut Vec<TableRequirement>,
113    ) -> Result<Table> {
114        let updates = action_commit.take_updates();
115        let requirements = action_commit.take_requirements();
116
117        for requirement in &requirements {
118            requirement.check(Some(table.metadata()))?;
119        }
120
121        let updated_table = Self::update_table_metadata(table, &updates)?;
122
123        existing_updates.extend(updates);
124        existing_requirements.extend(requirements);
125
126        Ok(updated_table)
127    }
128
129    /// Sets table to a new version.
130    pub fn upgrade_table_version(&self) -> UpgradeFormatVersionAction {
131        UpgradeFormatVersionAction::new()
132    }
133
134    /// Update table's property.
135    pub fn update_table_properties(&self) -> UpdatePropertiesAction {
136        UpdatePropertiesAction::new()
137    }
138
139    /// Creates a fast append action.
140    pub fn fast_append(&self) -> FastAppendAction {
141        FastAppendAction::new()
142    }
143
144    /// Creates replace sort order action.
145    pub fn replace_sort_order(&self) -> ReplaceSortOrderAction {
146        ReplaceSortOrderAction::new()
147    }
148
149    /// Set the location of table
150    pub fn update_location(&self) -> UpdateLocationAction {
151        UpdateLocationAction::new()
152    }
153
154    /// Update the statistics of table
155    pub fn update_statistics(&self) -> UpdateStatisticsAction {
156        UpdateStatisticsAction::new()
157    }
158
159    /// Commit transaction.
160    pub async fn commit(self, catalog: &dyn Catalog) -> Result<Table> {
161        if self.actions.is_empty() {
162            // nothing to commit
163            return Ok(self.table);
164        }
165
166        let table_props =
167            TableProperties::try_from(self.table.metadata().properties()).map_err(|e| {
168                Error::new(ErrorKind::DataInvalid, "Invalid table properties").with_source(e)
169            })?;
170
171        let backoff = Self::build_backoff(table_props)?;
172        let tx = self;
173
174        (|mut tx: Transaction| async {
175            let result = tx.do_commit(catalog).await;
176            (tx, result)
177        })
178        .retry(backoff)
179        .sleep(tokio::time::sleep)
180        .context(tx)
181        .when(|e| e.retryable())
182        .await
183        .1
184    }
185
186    fn build_backoff(props: TableProperties) -> Result<ExponentialBackoff> {
187        Ok(ExponentialBuilder::new()
188            .with_min_delay(Duration::from_millis(props.commit_min_retry_wait_ms))
189            .with_max_delay(Duration::from_millis(props.commit_max_retry_wait_ms))
190            .with_total_delay(Some(Duration::from_millis(
191                props.commit_total_retry_timeout_ms,
192            )))
193            .with_max_times(props.commit_num_retries)
194            .with_factor(2.0)
195            .build())
196    }
197
198    async fn do_commit(&mut self, catalog: &dyn Catalog) -> Result<Table> {
199        let refreshed = catalog.load_table(self.table.identifier()).await?;
200
201        if self.table.metadata() != refreshed.metadata()
202            || self.table.metadata_location() != refreshed.metadata_location()
203        {
204            // current base is stale, use refreshed as base and re-apply transaction actions
205            self.table = refreshed.clone();
206        }
207
208        let mut current_table = self.table.clone();
209        let mut existing_updates: Vec<TableUpdate> = vec![];
210        let mut existing_requirements: Vec<TableRequirement> = vec![];
211
212        for action in &self.actions {
213            let action_commit = Arc::clone(action).commit(&current_table).await?;
214            // apply action commit to current_table
215            current_table = Self::apply(
216                current_table,
217                action_commit,
218                &mut existing_updates,
219                &mut existing_requirements,
220            )?;
221        }
222
223        let table_commit = TableCommit::builder()
224            .ident(self.table.identifier().to_owned())
225            .updates(existing_updates)
226            .requirements(existing_requirements)
227            .build();
228
229        catalog.update_table(table_commit).await
230    }
231}
232
233#[cfg(test)]
234mod tests {
235    use std::collections::HashMap;
236    use std::fs::File;
237    use std::io::BufReader;
238    use std::sync::Arc;
239    use std::sync::atomic::{AtomicU32, Ordering};
240
241    use crate::catalog::MockCatalog;
242    use crate::io::FileIOBuilder;
243    use crate::spec::TableMetadata;
244    use crate::table::Table;
245    use crate::transaction::{ApplyTransactionAction, Transaction};
246    use crate::{Catalog, Error, ErrorKind, TableCreation, TableIdent};
247
248    pub fn make_v1_table() -> Table {
249        let file = File::open(format!(
250            "{}/testdata/table_metadata/{}",
251            env!("CARGO_MANIFEST_DIR"),
252            "TableMetadataV1Valid.json"
253        ))
254        .unwrap();
255        let reader = BufReader::new(file);
256        let resp = serde_json::from_reader::<_, TableMetadata>(reader).unwrap();
257
258        Table::builder()
259            .metadata(resp)
260            .metadata_location("s3://bucket/test/location/metadata/v1.json".to_string())
261            .identifier(TableIdent::from_strs(["ns1", "test1"]).unwrap())
262            .file_io(FileIOBuilder::new("memory").build().unwrap())
263            .build()
264            .unwrap()
265    }
266
267    pub fn make_v2_table() -> Table {
268        let file = File::open(format!(
269            "{}/testdata/table_metadata/{}",
270            env!("CARGO_MANIFEST_DIR"),
271            "TableMetadataV2Valid.json"
272        ))
273        .unwrap();
274        let reader = BufReader::new(file);
275        let resp = serde_json::from_reader::<_, TableMetadata>(reader).unwrap();
276
277        Table::builder()
278            .metadata(resp)
279            .metadata_location("s3://bucket/test/location/metadata/v1.json".to_string())
280            .identifier(TableIdent::from_strs(["ns1", "test1"]).unwrap())
281            .file_io(FileIOBuilder::new("memory").build().unwrap())
282            .build()
283            .unwrap()
284    }
285
286    pub fn make_v2_minimal_table() -> Table {
287        let file = File::open(format!(
288            "{}/testdata/table_metadata/{}",
289            env!("CARGO_MANIFEST_DIR"),
290            "TableMetadataV2ValidMinimal.json"
291        ))
292        .unwrap();
293        let reader = BufReader::new(file);
294        let resp = serde_json::from_reader::<_, TableMetadata>(reader).unwrap();
295
296        Table::builder()
297            .metadata(resp)
298            .metadata_location("s3://bucket/test/location/metadata/v1.json".to_string())
299            .identifier(TableIdent::from_strs(["ns1", "test1"]).unwrap())
300            .file_io(FileIOBuilder::new("memory").build().unwrap())
301            .build()
302            .unwrap()
303    }
304
305    pub(crate) async fn make_v3_minimal_table_in_catalog(catalog: &impl Catalog) -> Table {
306        let table_ident =
307            TableIdent::from_strs([format!("ns1-{}", uuid::Uuid::new_v4()), "test1".to_string()])
308                .unwrap();
309
310        catalog
311            .create_namespace(table_ident.namespace(), HashMap::new())
312            .await
313            .unwrap();
314
315        let file = File::open(format!(
316            "{}/testdata/table_metadata/{}",
317            env!("CARGO_MANIFEST_DIR"),
318            "TableMetadataV3ValidMinimal.json"
319        ))
320        .unwrap();
321        let reader = BufReader::new(file);
322        let base_metadata = serde_json::from_reader::<_, TableMetadata>(reader).unwrap();
323
324        let table_creation = TableCreation::builder()
325            .schema((**base_metadata.current_schema()).clone())
326            .partition_spec((**base_metadata.default_partition_spec()).clone())
327            .sort_order((**base_metadata.default_sort_order()).clone())
328            .name(table_ident.name().to_string())
329            .format_version(crate::spec::FormatVersion::V3)
330            .build();
331
332        catalog
333            .create_table(table_ident.namespace(), table_creation)
334            .await
335            .unwrap()
336    }
337
338    /// Helper function to create a test table with retry properties
339    pub(super) fn setup_test_table(num_retries: &str) -> Table {
340        let table = make_v2_table();
341
342        // Set retry properties
343        let mut props = HashMap::new();
344        props.insert("commit.retry.min-wait-ms".to_string(), "10".to_string());
345        props.insert("commit.retry.max-wait-ms".to_string(), "100".to_string());
346        props.insert(
347            "commit.retry.total-timeout-ms".to_string(),
348            "1000".to_string(),
349        );
350        props.insert(
351            "commit.retry.num-retries".to_string(),
352            num_retries.to_string(),
353        );
354
355        // Update table properties
356        let metadata = table
357            .metadata()
358            .clone()
359            .into_builder(None)
360            .set_properties(props)
361            .unwrap()
362            .build()
363            .unwrap()
364            .metadata;
365
366        table.with_metadata(Arc::new(metadata))
367    }
368
369    /// Helper function to create a transaction with a simple update action
370    fn create_test_transaction(table: &Table) -> Transaction {
371        let tx = Transaction::new(table);
372        tx.update_table_properties()
373            .set("test.key".to_string(), "test.value".to_string())
374            .apply(tx)
375            .unwrap()
376    }
377
378    /// Helper function to set up a mock catalog with retryable errors
379    fn setup_mock_catalog_with_retryable_errors(
380        success_after_attempts: Option<u32>,
381        expected_calls: usize,
382    ) -> MockCatalog {
383        let mut mock_catalog = MockCatalog::new();
384
385        mock_catalog
386            .expect_load_table()
387            .returning_st(|_| Box::pin(async move { Ok(make_v2_table()) }));
388
389        let attempts = AtomicU32::new(0);
390        mock_catalog
391            .expect_update_table()
392            .times(expected_calls)
393            .returning_st(move |_| {
394                if let Some(success_after_attempts) = success_after_attempts {
395                    attempts.fetch_add(1, Ordering::SeqCst);
396                    if attempts.load(Ordering::SeqCst) <= success_after_attempts {
397                        Box::pin(async move {
398                            Err(
399                                Error::new(ErrorKind::CatalogCommitConflicts, "Commit conflict")
400                                    .with_retryable(true),
401                            )
402                        })
403                    } else {
404                        Box::pin(async move { Ok(make_v2_table()) })
405                    }
406                } else {
407                    // Always fail with retryable error
408                    Box::pin(async move {
409                        Err(
410                            Error::new(ErrorKind::CatalogCommitConflicts, "Commit conflict")
411                                .with_retryable(true),
412                        )
413                    })
414                }
415            });
416
417        mock_catalog
418    }
419
420    /// Helper function to set up a mock catalog with non-retryable error
421    fn setup_mock_catalog_with_non_retryable_error() -> MockCatalog {
422        let mut mock_catalog = MockCatalog::new();
423
424        mock_catalog
425            .expect_load_table()
426            .returning_st(|_| Box::pin(async move { Ok(make_v2_table()) }));
427
428        mock_catalog
429            .expect_update_table()
430            .times(1) // Should only be called once since error is not retryable
431            .returning_st(move |_| {
432                Box::pin(async move {
433                    Err(Error::new(ErrorKind::Unexpected, "Non-retryable error")
434                        .with_retryable(false))
435                })
436            });
437
438        mock_catalog
439    }
440
441    #[tokio::test]
442    async fn test_commit_retryable_error() {
443        // Create a test table with retry properties
444        let table = setup_test_table("3");
445
446        // Create a transaction with a simple update action
447        let tx = create_test_transaction(&table);
448
449        // Create a mock catalog that fails twice then succeeds
450        let mock_catalog = setup_mock_catalog_with_retryable_errors(Some(2), 3);
451
452        // Commit the transaction
453        let result = tx.commit(&mock_catalog).await;
454
455        // Verify the result
456        assert!(result.is_ok(), "Transaction should eventually succeed");
457    }
458
459    #[tokio::test]
460    async fn test_commit_non_retryable_error() {
461        // Create a test table with retry properties
462        let table = setup_test_table("3");
463
464        // Create a transaction with a simple update action
465        let tx = create_test_transaction(&table);
466
467        // Create a mock catalog that fails with non-retryable error
468        let mock_catalog = setup_mock_catalog_with_non_retryable_error();
469
470        // Commit the transaction
471        let result = tx.commit(&mock_catalog).await;
472
473        // Verify the result
474        assert!(result.is_err(), "Transaction should fail immediately");
475        if let Err(err) = result {
476            assert_eq!(err.kind(), ErrorKind::Unexpected);
477            assert_eq!(err.message(), "Non-retryable error");
478            assert!(!err.retryable(), "Error should not be retryable");
479        }
480    }
481
482    #[tokio::test]
483    async fn test_commit_max_retries_exceeded() {
484        // Create a test table with retry properties (only allow 2 retries)
485        let table = setup_test_table("2");
486
487        // Create a transaction with a simple update action
488        let tx = create_test_transaction(&table);
489
490        // Create a mock catalog that always fails with retryable error
491        let mock_catalog = setup_mock_catalog_with_retryable_errors(None, 3); // Initial attempt + 2 retries = 3 total attempts
492
493        // Commit the transaction
494        let result = tx.commit(&mock_catalog).await;
495
496        // Verify the result
497        assert!(result.is_err(), "Transaction should fail after max retries");
498        if let Err(err) = result {
499            assert_eq!(err.kind(), ErrorKind::CatalogCommitConflicts);
500            assert_eq!(err.message(), "Commit conflict");
501            assert!(err.retryable(), "Error should be retryable");
502        }
503    }
504}
505
506#[cfg(test)]
507mod test_row_lineage {
508    use crate::memory::tests::new_memory_catalog;
509    use crate::spec::{
510        DataContentType, DataFile, DataFileBuilder, DataFileFormat, Literal, Struct,
511    };
512    use crate::transaction::tests::make_v3_minimal_table_in_catalog;
513    use crate::transaction::{ApplyTransactionAction, Transaction};
514
515    #[tokio::test]
516    async fn test_fast_append_with_row_lineage() {
517        // Helper function to create a data file with specified number of rows
518        fn file_with_rows(record_count: u64) -> DataFile {
519            DataFileBuilder::default()
520                .content(DataContentType::Data)
521                .file_path(format!("test/{}.parquet", record_count))
522                .file_format(DataFileFormat::Parquet)
523                .file_size_in_bytes(100)
524                .record_count(record_count)
525                .partition(Struct::from_iter([Some(Literal::long(0))]))
526                .partition_spec_id(0)
527                .build()
528                .unwrap()
529        }
530        let catalog = new_memory_catalog().await;
531
532        let table = make_v3_minimal_table_in_catalog(&catalog).await;
533
534        // Check initial state - next_row_id should be 0
535        assert_eq!(table.metadata().next_row_id(), 0);
536
537        // First fast append with 30 rows
538        let tx = Transaction::new(&table);
539        let data_file_30 = file_with_rows(30);
540        let action = tx.fast_append().add_data_files(vec![data_file_30]);
541        let tx = action.apply(tx).unwrap();
542        let table = tx.commit(&catalog).await.unwrap();
543
544        // Check snapshot and table state after first append
545        let snapshot = table.metadata().current_snapshot().unwrap();
546        assert_eq!(snapshot.first_row_id(), Some(0));
547        assert_eq!(table.metadata().next_row_id(), 30);
548
549        // Check written manifest for first_row_id
550        let manifest_list = table
551            .metadata()
552            .current_snapshot()
553            .unwrap()
554            .load_manifest_list(table.file_io(), table.metadata())
555            .await
556            .unwrap();
557
558        assert_eq!(manifest_list.entries().len(), 1);
559        let manifest_file = &manifest_list.entries()[0];
560        assert_eq!(manifest_file.first_row_id, Some(0));
561
562        // Second fast append with 17 and 11 rows
563        let tx = Transaction::new(&table);
564        let data_file_17 = file_with_rows(17);
565        let data_file_11 = file_with_rows(11);
566        let action = tx
567            .fast_append()
568            .add_data_files(vec![data_file_17, data_file_11]);
569        let tx = action.apply(tx).unwrap();
570        let table = tx.commit(&catalog).await.unwrap();
571
572        // Check snapshot and table state after second append
573        let snapshot = table.metadata().current_snapshot().unwrap();
574        assert_eq!(snapshot.first_row_id(), Some(30));
575        assert_eq!(table.metadata().next_row_id(), 30 + 17 + 11);
576
577        // Check written manifest for first_row_id
578        let manifest_list = table
579            .metadata()
580            .current_snapshot()
581            .unwrap()
582            .load_manifest_list(table.file_io(), table.metadata())
583            .await
584            .unwrap();
585        assert_eq!(manifest_list.entries().len(), 2);
586        let manifest_file = &manifest_list.entries()[1];
587        assert_eq!(manifest_file.first_row_id, Some(30));
588    }
589}