iceberg/transaction/
action.rs1use std::mem::take;
19use std::sync::Arc;
20
21use as_any::AsAny;
22use async_trait::async_trait;
23
24use crate::table::Table;
25use crate::transaction::Transaction;
26use crate::{Result, TableRequirement, TableUpdate};
27
28pub(crate) type BoxedTransactionAction = Arc<dyn TransactionAction>;
30
31#[async_trait]
37pub(crate) trait TransactionAction: AsAny + Sync + Send {
38 async fn commit(self: Arc<Self>, table: &Table) -> Result<ActionCommit>;
50}
51
52pub trait ApplyTransactionAction {
57 fn apply(self, tx: Transaction) -> Result<Transaction>;
67}
68
69impl<T: TransactionAction + 'static> ApplyTransactionAction for T {
70 fn apply(self, mut tx: Transaction) -> Result<Transaction>
71 where Self: Sized {
72 tx.actions.push(Arc::new(self));
73 Ok(tx)
74 }
75}
76
77pub struct ActionCommit {
82 updates: Vec<TableUpdate>,
83 requirements: Vec<TableRequirement>,
84}
85
86impl ActionCommit {
87 pub fn new(updates: Vec<TableUpdate>, requirements: Vec<TableRequirement>) -> Self {
89 Self {
90 updates,
91 requirements,
92 }
93 }
94
95 pub fn take_updates(&mut self) -> Vec<TableUpdate> {
97 take(&mut self.updates)
98 }
99
100 pub fn take_requirements(&mut self) -> Vec<TableRequirement> {
102 take(&mut self.requirements)
103 }
104}
105
106#[cfg(test)]
107mod tests {
108 use std::str::FromStr;
109 use std::sync::Arc;
110
111 use as_any::Downcast;
112 use async_trait::async_trait;
113 use uuid::Uuid;
114
115 use crate::table::Table;
116 use crate::transaction::Transaction;
117 use crate::transaction::action::{ActionCommit, ApplyTransactionAction, TransactionAction};
118 use crate::transaction::tests::make_v2_table;
119 use crate::{Result, TableRequirement, TableUpdate};
120
121 struct TestAction;
122
123 #[async_trait]
124 impl TransactionAction for TestAction {
125 async fn commit(self: Arc<Self>, _table: &Table) -> Result<ActionCommit> {
126 Ok(ActionCommit::new(
127 vec![TableUpdate::SetLocation {
128 location: String::from("s3://bucket/prefix/table/"),
129 }],
130 vec![TableRequirement::UuidMatch {
131 uuid: Uuid::from_str("9c12d441-03fe-4693-9a96-a0705ddf69c1")?,
132 }],
133 ))
134 }
135 }
136
137 #[tokio::test]
138 async fn test_commit_transaction_action() {
139 let table = make_v2_table();
140 let action = TestAction;
141
142 let mut action_commit = Arc::new(action).commit(&table).await.unwrap();
143
144 let updates = action_commit.take_updates();
145 let requirements = action_commit.take_requirements();
146
147 assert_eq!(updates[0], TableUpdate::SetLocation {
148 location: String::from("s3://bucket/prefix/table/")
149 });
150 assert_eq!(requirements[0], TableRequirement::UuidMatch {
151 uuid: Uuid::from_str("9c12d441-03fe-4693-9a96-a0705ddf69c1").unwrap()
152 });
153 }
154
155 #[test]
156 fn test_apply_transaction_action() {
157 let table = make_v2_table();
158 let action = TestAction;
159 let tx = Transaction::new(&table);
160
161 let updated_tx = action.apply(tx).unwrap();
162 assert_eq!(updated_tx.actions.len(), 1);
164
165 (*updated_tx.actions[0])
166 .downcast_ref::<TestAction>()
167 .expect("TestAction was not applied to Transaction!");
168 }
169
170 #[test]
171 fn test_action_commit() {
172 let location = String::from("s3://bucket/prefix/table/");
174 let uuid = Uuid::new_v4();
175 let updates = vec![TableUpdate::SetLocation { location }];
176 let requirements = vec![TableRequirement::UuidMatch { uuid }];
177
178 let mut action_commit = ActionCommit::new(updates.clone(), requirements.clone());
179
180 let taken_updates = action_commit.take_updates();
181 let taken_requirements = action_commit.take_requirements();
182
183 assert_eq!(taken_updates, updates);
185 assert_eq!(taken_requirements, requirements);
186
187 assert!(action_commit.take_updates().is_empty());
188 assert!(action_commit.take_requirements().is_empty());
189 }
190}