iceberg/transaction/
sort_order.rs1use std::sync::Arc;
19
20use async_trait::async_trait;
21
22use crate::error::Result;
23use crate::spec::{NullOrder, SchemaRef, SortDirection, SortField, SortOrder, Transform};
24use crate::table::Table;
25use crate::transaction::{ActionCommit, TransactionAction};
26use crate::{Error, ErrorKind, TableRequirement, TableUpdate};
27
28#[derive(Debug, PartialEq, Eq, Clone)]
32struct PendingSortField {
33 name: String,
34 direction: SortDirection,
35 null_order: NullOrder,
36}
37
38impl PendingSortField {
39 fn to_sort_field(&self, schema: &SchemaRef) -> Result<SortField> {
40 let field_id = schema.field_id_by_name(self.name.as_str()).ok_or_else(|| {
41 Error::new(
42 ErrorKind::DataInvalid,
43 format!("Cannot find field {} in table schema", self.name),
44 )
45 })?;
46
47 Ok(SortField::builder()
48 .source_id(field_id)
49 .transform(Transform::Identity)
50 .direction(self.direction)
51 .null_order(self.null_order)
52 .build())
53 }
54}
55
56pub struct ReplaceSortOrderAction {
58 pending_sort_fields: Vec<PendingSortField>,
59}
60
61impl ReplaceSortOrderAction {
62 pub fn new() -> Self {
63 ReplaceSortOrderAction {
64 pending_sort_fields: vec![],
65 }
66 }
67
68 pub fn asc(self, name: &str, null_order: NullOrder) -> Self {
70 self.add_sort_field(name, SortDirection::Ascending, null_order)
71 }
72
73 pub fn desc(self, name: &str, null_order: NullOrder) -> Self {
75 self.add_sort_field(name, SortDirection::Descending, null_order)
76 }
77
78 fn add_sort_field(
79 mut self,
80 name: &str,
81 sort_direction: SortDirection,
82 null_order: NullOrder,
83 ) -> Self {
84 self.pending_sort_fields.push(PendingSortField {
85 name: name.to_string(),
86 direction: sort_direction,
87 null_order,
88 });
89
90 self
91 }
92}
93
94impl Default for ReplaceSortOrderAction {
95 fn default() -> Self {
96 Self::new()
97 }
98}
99
100#[async_trait]
101impl TransactionAction for ReplaceSortOrderAction {
102 async fn commit(self: Arc<Self>, table: &Table) -> Result<ActionCommit> {
103 let current_schema = table.metadata().current_schema();
104 let sort_fields: Result<Vec<SortField>> = self
105 .pending_sort_fields
106 .iter()
107 .map(|p| p.to_sort_field(current_schema))
108 .collect();
109
110 let bound_sort_order = SortOrder::builder()
111 .with_fields(sort_fields?)
112 .build(current_schema)?;
113
114 let updates = vec![
115 TableUpdate::AddSortOrder {
116 sort_order: bound_sort_order,
117 },
118 TableUpdate::SetDefaultSortOrder { sort_order_id: -1 },
119 ];
120
121 let requirements = vec![
122 TableRequirement::CurrentSchemaIdMatch {
123 current_schema_id: current_schema.schema_id(),
124 },
125 TableRequirement::DefaultSortOrderIdMatch {
126 default_sort_order_id: table.metadata().default_sort_order().order_id,
127 },
128 ];
129
130 Ok(ActionCommit::new(updates, requirements))
131 }
132}
133
134#[cfg(test)]
135mod tests {
136 use as_any::Downcast;
137
138 use crate::spec::{NullOrder, SortDirection};
139 use crate::transaction::sort_order::{PendingSortField, ReplaceSortOrderAction};
140 use crate::transaction::tests::make_v2_table;
141 use crate::transaction::{ApplyTransactionAction, Transaction};
142
143 #[test]
144 fn test_replace_sort_order() {
145 let table = make_v2_table();
146 let tx = Transaction::new(&table);
147 let replace_sort_order = tx.replace_sort_order();
148
149 let tx = replace_sort_order
150 .asc("x", NullOrder::First)
151 .desc("y", NullOrder::Last)
152 .apply(tx)
153 .unwrap();
154
155 let replace_sort_order = (*tx.actions[0])
156 .downcast_ref::<ReplaceSortOrderAction>()
157 .unwrap();
158
159 assert_eq!(replace_sort_order.pending_sort_fields, vec![
160 PendingSortField {
161 name: String::from("x"),
162 direction: SortDirection::Ascending,
163 null_order: NullOrder::First,
164 },
165 PendingSortField {
166 name: String::from("y"),
167 direction: SortDirection::Descending,
168 null_order: NullOrder::Last,
169 }
170 ]);
171 }
172}