iceberg/transaction/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! This module contains transaction api.
19//!
20//! The transaction API enables changes to be made to an existing table.
21//!
22//! Note that this may also have side effects, such as producing new manifest
23//! files.
24//!
25//! Below is a basic example using the "fast-append" action:
26//!
27//! ```ignore
28//! use iceberg::transaction::{ApplyTransactionAction, Transaction};
29//! use iceberg::Catalog;
30//!
31//! // Create a transaction.
32//! let tx = Transaction::new(my_table);
33//!
34//! // Create a `FastAppendAction` which will not rewrite or append
35//! // to existing metadata. This will create a new manifest.
36//! let action = tx.fast_append().add_data_files(my_data_files);
37//!
38//! // Apply the fast-append action to the given transaction, returning
39//! // the newly updated `Transaction`.
40//! let tx = action.apply(tx).unwrap();
41//!
42//!
43//! // End the transaction by committing to an `iceberg::Catalog`
44//! // implementation. This will cause a table update to occur.
45//! let table = tx
46//!     .commit(&some_catalog_impl)
47//!     .await
48//!     .unwrap();
49//! ```
50
51/// The `ApplyTransactionAction` trait provides an `apply` method
52/// that allows users to apply a transaction action to a `Transaction`.
53mod action;
54
55pub use action::*;
56mod append;
57mod snapshot;
58mod sort_order;
59mod update_location;
60mod update_properties;
61mod update_schema;
62mod update_statistics;
63mod upgrade_format_version;
64
65use std::sync::Arc;
66use std::time::Duration;
67
68use backon::{BackoffBuilder, ExponentialBackoff, ExponentialBuilder, RetryableWithContext};
69pub use update_schema::AddColumn;
70
71use crate::error::Result;
72use crate::spec::TableProperties;
73use crate::table::Table;
74use crate::transaction::action::BoxedTransactionAction;
75use crate::transaction::append::FastAppendAction;
76use crate::transaction::sort_order::ReplaceSortOrderAction;
77use crate::transaction::update_location::UpdateLocationAction;
78use crate::transaction::update_properties::UpdatePropertiesAction;
79use crate::transaction::update_schema::UpdateSchemaAction;
80use crate::transaction::update_statistics::UpdateStatisticsAction;
81use crate::transaction::upgrade_format_version::UpgradeFormatVersionAction;
82use crate::{Catalog, TableCommit, TableRequirement, TableUpdate};
83
84/// Table transaction.
85#[derive(Clone)]
86pub struct Transaction {
87    table: Table,
88    actions: Vec<BoxedTransactionAction>,
89}
90
91impl Transaction {
92    /// Creates a new transaction.
93    pub fn new(table: &Table) -> Self {
94        Self {
95            table: table.clone(),
96            actions: vec![],
97        }
98    }
99
100    fn update_table_metadata(table: Table, updates: &[TableUpdate]) -> Result<Table> {
101        let mut metadata_builder = table.metadata().clone().into_builder(None);
102        for update in updates {
103            metadata_builder = update.clone().apply(metadata_builder)?;
104        }
105
106        Ok(table.with_metadata(Arc::new(metadata_builder.build()?.metadata)))
107    }
108
109    /// Applies an [`ActionCommit`] to the given [`Table`], returning a new [`Table`] with updated metadata.
110    /// Also appends any derived [`TableUpdate`]s and [`TableRequirement`]s to the provided vectors.
111    fn apply(
112        table: Table,
113        mut action_commit: ActionCommit,
114        existing_updates: &mut Vec<TableUpdate>,
115        existing_requirements: &mut Vec<TableRequirement>,
116    ) -> Result<Table> {
117        let updates = action_commit.take_updates();
118        let requirements = action_commit.take_requirements();
119
120        for requirement in &requirements {
121            requirement.check(Some(table.metadata()))?;
122        }
123
124        let updated_table = Self::update_table_metadata(table, &updates)?;
125
126        existing_updates.extend(updates);
127        existing_requirements.extend(requirements);
128
129        Ok(updated_table)
130    }
131
132    /// Sets table to a new version.
133    pub fn upgrade_table_version(&self) -> UpgradeFormatVersionAction {
134        UpgradeFormatVersionAction::new()
135    }
136
137    /// Update table's property.
138    pub fn update_table_properties(&self) -> UpdatePropertiesAction {
139        UpdatePropertiesAction::new()
140    }
141
142    /// Creates an update schema action.
143    pub fn update_schema(&self) -> UpdateSchemaAction {
144        UpdateSchemaAction::new()
145    }
146
147    /// Creates a fast append action.
148    pub fn fast_append(&self) -> FastAppendAction {
149        FastAppendAction::new()
150    }
151
152    /// Creates replace sort order action.
153    pub fn replace_sort_order(&self) -> ReplaceSortOrderAction {
154        ReplaceSortOrderAction::new()
155    }
156
157    /// Set the location of table
158    pub fn update_location(&self) -> UpdateLocationAction {
159        UpdateLocationAction::new()
160    }
161
162    /// Update the statistics of table
163    pub fn update_statistics(&self) -> UpdateStatisticsAction {
164        UpdateStatisticsAction::new()
165    }
166
167    /// Commit transaction.
168    pub async fn commit(self, catalog: &dyn Catalog) -> Result<Table> {
169        if self.actions.is_empty() {
170            // nothing to commit
171            return Ok(self.table);
172        }
173
174        let table_props = self.table.metadata().table_properties()?;
175
176        let backoff = Self::build_backoff(table_props)?;
177        let tx = self;
178
179        (|mut tx: Transaction| async {
180            let result = tx.do_commit(catalog).await;
181            (tx, result)
182        })
183        .retry(backoff)
184        .sleep(tokio::time::sleep)
185        .context(tx)
186        .when(|e| e.retryable())
187        .await
188        .1
189    }
190
191    fn build_backoff(props: TableProperties) -> Result<ExponentialBackoff> {
192        Ok(ExponentialBuilder::new()
193            .with_min_delay(Duration::from_millis(props.commit_min_retry_wait_ms))
194            .with_max_delay(Duration::from_millis(props.commit_max_retry_wait_ms))
195            .with_total_delay(Some(Duration::from_millis(
196                props.commit_total_retry_timeout_ms,
197            )))
198            .with_max_times(props.commit_num_retries)
199            .with_factor(2.0)
200            .build())
201    }
202
203    async fn do_commit(&mut self, catalog: &dyn Catalog) -> Result<Table> {
204        let refreshed = catalog.load_table(self.table.identifier()).await?;
205
206        if self.table.metadata() != refreshed.metadata()
207            || self.table.metadata_location() != refreshed.metadata_location()
208        {
209            // current base is stale, use refreshed as base and re-apply transaction actions
210            self.table = refreshed.clone();
211        }
212
213        let mut current_table = self.table.clone();
214        let mut existing_updates: Vec<TableUpdate> = vec![];
215        let mut existing_requirements: Vec<TableRequirement> = vec![];
216
217        for action in &self.actions {
218            let action_commit = Arc::clone(action).commit(&current_table).await?;
219            // apply action commit to current_table
220            current_table = Self::apply(
221                current_table,
222                action_commit,
223                &mut existing_updates,
224                &mut existing_requirements,
225            )?;
226        }
227
228        let table_commit = TableCommit::builder()
229            .ident(self.table.identifier().to_owned())
230            .updates(existing_updates)
231            .requirements(existing_requirements)
232            .build();
233
234        catalog.update_table(table_commit).await
235    }
236}
237
238#[cfg(test)]
239mod tests {
240    use std::collections::HashMap;
241    use std::fs::File;
242    use std::io::BufReader;
243    use std::sync::Arc;
244    use std::sync::atomic::{AtomicU32, Ordering};
245
246    use crate::catalog::MockCatalog;
247    use crate::io::FileIO;
248    use crate::memory::tests::new_memory_catalog;
249    use crate::spec::{
250        DataContentType, DataFileBuilder, DataFileFormat, Literal, Struct, TableMetadata,
251    };
252    use crate::table::Table;
253    use crate::test_utils::test_runtime;
254    use crate::transaction::{ApplyTransactionAction, Transaction};
255    use crate::{Catalog, Error, ErrorKind, TableCreation, TableIdent};
256
257    pub fn make_v1_table() -> Table {
258        let file = File::open(format!(
259            "{}/testdata/table_metadata/{}",
260            env!("CARGO_MANIFEST_DIR"),
261            "TableMetadataV1Valid.json"
262        ))
263        .unwrap();
264        let reader = BufReader::new(file);
265        let resp = serde_json::from_reader::<_, TableMetadata>(reader).unwrap();
266
267        Table::builder()
268            .metadata(resp)
269            .metadata_location("s3://bucket/test/location/metadata/v1.json")
270            .identifier(TableIdent::from_strs(["ns1", "test1"]).unwrap())
271            .file_io(FileIO::new_with_memory())
272            .runtime(test_runtime())
273            .build()
274            .unwrap()
275    }
276
277    pub fn make_v2_table() -> Table {
278        let file = File::open(format!(
279            "{}/testdata/table_metadata/{}",
280            env!("CARGO_MANIFEST_DIR"),
281            "TableMetadataV2Valid.json"
282        ))
283        .unwrap();
284        let reader = BufReader::new(file);
285        let resp = serde_json::from_reader::<_, TableMetadata>(reader).unwrap();
286
287        Table::builder()
288            .metadata(resp)
289            .metadata_location("s3://bucket/test/location/metadata/v1.json")
290            .identifier(TableIdent::from_strs(["ns1", "test1"]).unwrap())
291            .file_io(FileIO::new_with_memory())
292            .runtime(test_runtime())
293            .build()
294            .unwrap()
295    }
296
297    pub fn make_v2_minimal_table() -> Table {
298        let file = File::open(format!(
299            "{}/testdata/table_metadata/{}",
300            env!("CARGO_MANIFEST_DIR"),
301            "TableMetadataV2ValidMinimal.json"
302        ))
303        .unwrap();
304        let reader = BufReader::new(file);
305        let resp = serde_json::from_reader::<_, TableMetadata>(reader).unwrap();
306
307        Table::builder()
308            .metadata(resp)
309            .metadata_location("s3://bucket/test/location/metadata/v1.json")
310            .identifier(TableIdent::from_strs(["ns1", "test1"]).unwrap())
311            .file_io(FileIO::new_with_memory())
312            .runtime(test_runtime())
313            .build()
314            .unwrap()
315    }
316
317    pub(crate) async fn make_v3_minimal_table_in_catalog(catalog: &impl Catalog) -> Table {
318        let table_ident =
319            TableIdent::from_strs([format!("ns1-{}", uuid::Uuid::new_v4()), "test1".to_string()])
320                .unwrap();
321
322        catalog
323            .create_namespace(table_ident.namespace(), HashMap::new())
324            .await
325            .unwrap();
326
327        let file = File::open(format!(
328            "{}/testdata/table_metadata/{}",
329            env!("CARGO_MANIFEST_DIR"),
330            "TableMetadataV3ValidMinimal.json"
331        ))
332        .unwrap();
333        let reader = BufReader::new(file);
334        let base_metadata = serde_json::from_reader::<_, TableMetadata>(reader).unwrap();
335
336        let table_creation = TableCreation::builder()
337            .schema((**base_metadata.current_schema()).clone())
338            .partition_spec((**base_metadata.default_partition_spec()).clone())
339            .sort_order((**base_metadata.default_sort_order()).clone())
340            .name(table_ident.name().to_string())
341            .format_version(crate::spec::FormatVersion::V3)
342            .build();
343
344        catalog
345            .create_table(table_ident.namespace(), table_creation)
346            .await
347            .unwrap()
348    }
349
350    /// Helper function to create a test table with retry properties
351    pub(super) fn setup_test_table(num_retries: &str) -> Table {
352        let table = make_v2_table();
353
354        // Set retry properties
355        let mut props = HashMap::new();
356        props.insert("commit.retry.min-wait-ms".to_string(), "10".to_string());
357        props.insert("commit.retry.max-wait-ms".to_string(), "100".to_string());
358        props.insert(
359            "commit.retry.total-timeout-ms".to_string(),
360            "1000".to_string(),
361        );
362        props.insert(
363            "commit.retry.num-retries".to_string(),
364            num_retries.to_string(),
365        );
366
367        // Update table properties
368        let metadata = table
369            .metadata()
370            .clone()
371            .into_builder(None)
372            .set_properties(props)
373            .unwrap()
374            .build()
375            .unwrap()
376            .metadata;
377
378        table.with_metadata(Arc::new(metadata))
379    }
380
381    /// Helper function to create a transaction with a simple update action
382    fn create_test_transaction(table: &Table) -> Transaction {
383        let tx = Transaction::new(table);
384        tx.update_table_properties()
385            .set("test.key".to_string(), "test.value".to_string())
386            .apply(tx)
387            .unwrap()
388    }
389
390    /// Helper function to set up a mock catalog with retryable errors
391    fn setup_mock_catalog_with_retryable_errors(
392        success_after_attempts: Option<u32>,
393        expected_calls: usize,
394    ) -> MockCatalog {
395        let mut mock_catalog = MockCatalog::new();
396
397        mock_catalog
398            .expect_load_table()
399            .returning_st(|_| Box::pin(async move { Ok(make_v2_table()) }));
400
401        let attempts = AtomicU32::new(0);
402        mock_catalog
403            .expect_update_table()
404            .times(expected_calls)
405            .returning_st(move |_| {
406                if let Some(success_after_attempts) = success_after_attempts {
407                    attempts.fetch_add(1, Ordering::SeqCst);
408                    if attempts.load(Ordering::SeqCst) <= success_after_attempts {
409                        Box::pin(async move {
410                            Err(
411                                Error::new(ErrorKind::CatalogCommitConflicts, "Commit conflict")
412                                    .with_retryable(true),
413                            )
414                        })
415                    } else {
416                        Box::pin(async move { Ok(make_v2_table()) })
417                    }
418                } else {
419                    // Always fail with retryable error
420                    Box::pin(async move {
421                        Err(
422                            Error::new(ErrorKind::CatalogCommitConflicts, "Commit conflict")
423                                .with_retryable(true),
424                        )
425                    })
426                }
427            });
428
429        mock_catalog
430    }
431
432    /// Helper function to set up a mock catalog with non-retryable error
433    fn setup_mock_catalog_with_non_retryable_error() -> MockCatalog {
434        let mut mock_catalog = MockCatalog::new();
435
436        mock_catalog
437            .expect_load_table()
438            .returning_st(|_| Box::pin(async move { Ok(make_v2_table()) }));
439
440        mock_catalog
441            .expect_update_table()
442            .times(1) // Should only be called once since error is not retryable
443            .returning_st(move |_| {
444                Box::pin(async move {
445                    Err(Error::new(ErrorKind::Unexpected, "Non-retryable error")
446                        .with_retryable(false))
447                })
448            });
449
450        mock_catalog
451    }
452
453    #[tokio::test]
454    async fn test_commit_retryable_error() {
455        // Create a test table with retry properties
456        let table = setup_test_table("3");
457
458        // Create a transaction with a simple update action
459        let tx = create_test_transaction(&table);
460
461        // Create a mock catalog that fails twice then succeeds
462        let mock_catalog = setup_mock_catalog_with_retryable_errors(Some(2), 3);
463
464        // Commit the transaction
465        let result = tx.commit(&mock_catalog).await;
466
467        // Verify the result
468        assert!(result.is_ok(), "Transaction should eventually succeed");
469    }
470
471    #[tokio::test]
472    async fn test_commit_non_retryable_error() {
473        // Create a test table with retry properties
474        let table = setup_test_table("3");
475
476        // Create a transaction with a simple update action
477        let tx = create_test_transaction(&table);
478
479        // Create a mock catalog that fails with non-retryable error
480        let mock_catalog = setup_mock_catalog_with_non_retryable_error();
481
482        // Commit the transaction
483        let result = tx.commit(&mock_catalog).await;
484
485        // Verify the result
486        assert!(result.is_err(), "Transaction should fail immediately");
487        if let Err(err) = result {
488            assert_eq!(err.kind(), ErrorKind::Unexpected);
489            assert_eq!(err.message(), "Non-retryable error");
490            assert!(!err.retryable(), "Error should not be retryable");
491        }
492    }
493
494    #[tokio::test]
495    async fn test_commit_max_retries_exceeded() {
496        // Create a test table with retry properties (only allow 2 retries)
497        let table = setup_test_table("2");
498
499        // Create a transaction with a simple update action
500        let tx = create_test_transaction(&table);
501
502        // Create a mock catalog that always fails with retryable error
503        let mock_catalog = setup_mock_catalog_with_retryable_errors(None, 3); // Initial attempt + 2 retries = 3 total attempts
504
505        // Commit the transaction
506        let result = tx.commit(&mock_catalog).await;
507
508        // Verify the result
509        assert!(result.is_err(), "Transaction should fail after max retries");
510        if let Err(err) = result {
511            assert_eq!(err.kind(), ErrorKind::CatalogCommitConflicts);
512            assert_eq!(err.message(), "Commit conflict");
513            assert!(err.retryable(), "Error should be retryable");
514        }
515    }
516
517    #[tokio::test]
518    async fn test_transaction_snapshot_summary() {
519        let catalog = new_memory_catalog().await;
520        let table = make_v3_minimal_table_in_catalog(&catalog).await;
521
522        let mut file_seq = 0u32;
523        let mut append_file = |table: &crate::table::Table, record_count: u64, file_size: u64| {
524            file_seq += 1;
525            let file = DataFileBuilder::default()
526                .content(DataContentType::Data)
527                .file_path(format!("test/{file_seq}.parquet"))
528                .file_format(DataFileFormat::Parquet)
529                .file_size_in_bytes(file_size)
530                .record_count(record_count)
531                .partition(Struct::from_iter([Some(Literal::long(1))]))
532                .partition_spec_id(0)
533                .build()
534                .unwrap();
535            let tx = Transaction::new(table);
536            tx.fast_append()
537                .add_data_files(vec![file])
538                .apply(tx)
539                .unwrap()
540        };
541
542        let table = append_file(&table, /*record_count=*/ 10, /*file_size=*/ 100)
543            .commit(&catalog)
544            .await
545            .unwrap();
546        let table = append_file(&table, /*record_count=*/ 20, /*file_size=*/ 200)
547            .commit(&catalog)
548            .await
549            .unwrap();
550
551        let summary = &table
552            .metadata()
553            .current_snapshot()
554            .unwrap()
555            .summary()
556            .additional_properties;
557
558        assert_eq!(summary.get("total-records").unwrap(), "30");
559        assert_eq!(summary.get("total-data-files").unwrap(), "2");
560        assert_eq!(summary.get("total-files-size").unwrap(), "300");
561    }
562}
563
564#[cfg(test)]
565mod test_row_lineage {
566    use crate::memory::tests::new_memory_catalog;
567    use crate::spec::{
568        DataContentType, DataFile, DataFileBuilder, DataFileFormat, Literal, Struct,
569    };
570    use crate::transaction::tests::make_v3_minimal_table_in_catalog;
571    use crate::transaction::{ApplyTransactionAction, Transaction};
572
573    #[tokio::test]
574    async fn test_fast_append_with_row_lineage() {
575        // Helper function to create a data file with specified number of rows
576        fn file_with_rows(record_count: u64) -> DataFile {
577            DataFileBuilder::default()
578                .content(DataContentType::Data)
579                .file_path(format!("test/{record_count}.parquet"))
580                .file_format(DataFileFormat::Parquet)
581                .file_size_in_bytes(100)
582                .record_count(record_count)
583                .partition(Struct::from_iter([Some(Literal::long(0))]))
584                .partition_spec_id(0)
585                .build()
586                .unwrap()
587        }
588        let catalog = new_memory_catalog().await;
589
590        let table = make_v3_minimal_table_in_catalog(&catalog).await;
591
592        // Check initial state - next_row_id should be 0
593        assert_eq!(table.metadata().next_row_id(), 0);
594
595        // First fast append with 30 rows
596        let tx = Transaction::new(&table);
597        let data_file_30 = file_with_rows(30);
598        let action = tx.fast_append().add_data_files(vec![data_file_30]);
599        let tx = action.apply(tx).unwrap();
600        let table = tx.commit(&catalog).await.unwrap();
601
602        // Check snapshot and table state after first append
603        let snapshot = table.metadata().current_snapshot().unwrap();
604        assert_eq!(snapshot.first_row_id(), Some(0));
605        assert_eq!(table.metadata().next_row_id(), 30);
606
607        // Check written manifest for first_row_id
608        let manifest_list = table
609            .metadata()
610            .current_snapshot()
611            .unwrap()
612            .load_manifest_list(table.file_io(), table.metadata())
613            .await
614            .unwrap();
615
616        assert_eq!(manifest_list.entries().len(), 1);
617        let manifest_file = &manifest_list.entries()[0];
618        assert_eq!(manifest_file.first_row_id, Some(0));
619
620        // Second fast append with 17 and 11 rows
621        let tx = Transaction::new(&table);
622        let data_file_17 = file_with_rows(17);
623        let data_file_11 = file_with_rows(11);
624        let action = tx
625            .fast_append()
626            .add_data_files(vec![data_file_17, data_file_11]);
627        let tx = action.apply(tx).unwrap();
628        let table = tx.commit(&catalog).await.unwrap();
629
630        // Check snapshot and table state after second append
631        let snapshot = table.metadata().current_snapshot().unwrap();
632        assert_eq!(snapshot.first_row_id(), Some(30));
633        assert_eq!(table.metadata().next_row_id(), 30 + 17 + 11);
634
635        // Check written manifest for first_row_id
636        let manifest_list = table
637            .metadata()
638            .current_snapshot()
639            .unwrap()
640            .load_manifest_list(table.file_io(), table.metadata())
641            .await
642            .unwrap();
643        assert_eq!(manifest_list.entries().len(), 2);
644        let manifest_file = &manifest_list.entries()[1];
645        assert_eq!(manifest_file.first_row_id, Some(30));
646    }
647}