iceberg/transaction/
action.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::mem::take;
19use std::sync::Arc;
20
21use as_any::AsAny;
22use async_trait::async_trait;
23
24use crate::table::Table;
25use crate::transaction::Transaction;
26use crate::{Result, TableRequirement, TableUpdate};
27
28/// A boxed, thread-safe reference to a `TransactionAction`.
29pub(crate) type BoxedTransactionAction = Arc<dyn TransactionAction>;
30
31/// A trait representing an atomic action that can be part of a transaction.
32///
33/// Implementors of this trait define how a specific action is committed to a table.
34/// Each action is responsible for generating the updates and requirements needed
35/// to modify the table metadata.
36#[async_trait]
37pub(crate) trait TransactionAction: AsAny + Sync + Send {
38    /// Commits this action against the provided table and returns the resulting updates.
39    /// NOTE: This function is intended for internal use only and should not be called directly by users.
40    ///
41    /// # Arguments
42    ///
43    /// * `table` - The current state of the table this action should apply to.
44    ///
45    /// # Returns
46    ///
47    /// An `ActionCommit` containing table updates and table requirements,
48    /// or an error if the commit fails.
49    async fn commit(self: Arc<Self>, table: &Table) -> Result<ActionCommit>;
50}
51
52/// A helper trait for applying a `TransactionAction` to a `Transaction`.
53///
54/// This is implemented for all `TransactionAction` types
55/// to allow easy chaining of actions into a transaction context.
56pub trait ApplyTransactionAction {
57    /// Adds this action to the given transaction.
58    ///
59    /// # Arguments
60    ///
61    /// * `tx` - The transaction to apply the action to.
62    ///
63    /// # Returns
64    ///
65    /// The modified transaction containing this action, or an error if the operation fails.
66    fn apply(self, tx: Transaction) -> Result<Transaction>;
67}
68
69impl<T: TransactionAction + 'static> ApplyTransactionAction for T {
70    fn apply(self, mut tx: Transaction) -> Result<Transaction>
71    where Self: Sized {
72        tx.actions.push(Arc::new(self));
73        Ok(tx)
74    }
75}
76
77/// The result of committing a `TransactionAction`.
78///
79/// This struct contains the updates to apply to the table's metadata
80/// and any preconditions that must be satisfied before the update can be committed.
81pub struct ActionCommit {
82    updates: Vec<TableUpdate>,
83    requirements: Vec<TableRequirement>,
84}
85
86impl ActionCommit {
87    /// Creates a new `ActionCommit` from the given updates and requirements.
88    pub fn new(updates: Vec<TableUpdate>, requirements: Vec<TableRequirement>) -> Self {
89        Self {
90            updates,
91            requirements,
92        }
93    }
94
95    /// Consumes and returns the list of table updates.
96    pub fn take_updates(&mut self) -> Vec<TableUpdate> {
97        take(&mut self.updates)
98    }
99
100    /// Consumes and returns the list of table requirements.
101    pub fn take_requirements(&mut self) -> Vec<TableRequirement> {
102        take(&mut self.requirements)
103    }
104}
105
106#[cfg(test)]
107mod tests {
108    use std::str::FromStr;
109    use std::sync::Arc;
110
111    use as_any::Downcast;
112    use async_trait::async_trait;
113    use uuid::Uuid;
114
115    use crate::table::Table;
116    use crate::transaction::Transaction;
117    use crate::transaction::action::{ActionCommit, ApplyTransactionAction, TransactionAction};
118    use crate::transaction::tests::make_v2_table;
119    use crate::{Result, TableRequirement, TableUpdate};
120
121    struct TestAction;
122
123    #[async_trait]
124    impl TransactionAction for TestAction {
125        async fn commit(self: Arc<Self>, _table: &Table) -> Result<ActionCommit> {
126            Ok(ActionCommit::new(
127                vec![TableUpdate::SetLocation {
128                    location: String::from("s3://bucket/prefix/table/"),
129                }],
130                vec![TableRequirement::UuidMatch {
131                    uuid: Uuid::from_str("9c12d441-03fe-4693-9a96-a0705ddf69c1")?,
132                }],
133            ))
134        }
135    }
136
137    #[tokio::test]
138    async fn test_commit_transaction_action() {
139        let table = make_v2_table();
140        let action = TestAction;
141
142        let mut action_commit = Arc::new(action).commit(&table).await.unwrap();
143
144        let updates = action_commit.take_updates();
145        let requirements = action_commit.take_requirements();
146
147        assert_eq!(updates[0], TableUpdate::SetLocation {
148            location: String::from("s3://bucket/prefix/table/")
149        });
150        assert_eq!(requirements[0], TableRequirement::UuidMatch {
151            uuid: Uuid::from_str("9c12d441-03fe-4693-9a96-a0705ddf69c1").unwrap()
152        });
153    }
154
155    #[test]
156    fn test_apply_transaction_action() {
157        let table = make_v2_table();
158        let action = TestAction;
159        let tx = Transaction::new(&table);
160
161        let updated_tx = action.apply(tx).unwrap();
162        // There should be one action in the transaction now
163        assert_eq!(updated_tx.actions.len(), 1);
164
165        (*updated_tx.actions[0])
166            .downcast_ref::<TestAction>()
167            .expect("TestAction was not applied to Transaction!");
168    }
169
170    #[test]
171    fn test_action_commit() {
172        // Create dummy updates and requirements
173        let location = String::from("s3://bucket/prefix/table/");
174        let uuid = Uuid::new_v4();
175        let updates = vec![TableUpdate::SetLocation { location }];
176        let requirements = vec![TableRequirement::UuidMatch { uuid }];
177
178        let mut action_commit = ActionCommit::new(updates.clone(), requirements.clone());
179
180        let taken_updates = action_commit.take_updates();
181        let taken_requirements = action_commit.take_requirements();
182
183        // Check values are returned correctly
184        assert_eq!(taken_updates, updates);
185        assert_eq!(taken_requirements, requirements);
186
187        assert!(action_commit.take_updates().is_empty());
188        assert!(action_commit.take_requirements().is_empty());
189    }
190}