iceberg/transaction/
sort_order.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::sync::Arc;
19
20use async_trait::async_trait;
21
22use crate::error::Result;
23use crate::spec::{NullOrder, SchemaRef, SortDirection, SortField, SortOrder, Transform};
24use crate::table::Table;
25use crate::transaction::{ActionCommit, TransactionAction};
26use crate::{Error, ErrorKind, TableRequirement, TableUpdate};
27
28/// Represents a sort field whose construction and validation are deferred until commit time.
29/// This avoids the need to pass a `Table` reference into methods like `asc` or `desc` when
30/// adding sort orders.
31#[derive(Debug, PartialEq, Eq, Clone)]
32struct PendingSortField {
33    name: String,
34    direction: SortDirection,
35    null_order: NullOrder,
36}
37
38impl PendingSortField {
39    fn to_sort_field(&self, schema: &SchemaRef) -> Result<SortField> {
40        let field_id = schema.field_id_by_name(self.name.as_str()).ok_or_else(|| {
41            Error::new(
42                ErrorKind::DataInvalid,
43                format!("Cannot find field {} in table schema", self.name),
44            )
45        })?;
46
47        Ok(SortField::builder()
48            .source_id(field_id)
49            .transform(Transform::Identity)
50            .direction(self.direction)
51            .null_order(self.null_order)
52            .build())
53    }
54}
55
56/// Transaction action for replacing sort order.
57pub struct ReplaceSortOrderAction {
58    pending_sort_fields: Vec<PendingSortField>,
59}
60
61impl ReplaceSortOrderAction {
62    pub fn new() -> Self {
63        ReplaceSortOrderAction {
64            pending_sort_fields: vec![],
65        }
66    }
67
68    /// Adds a field for sorting in ascending order.
69    pub fn asc(self, name: &str, null_order: NullOrder) -> Self {
70        self.add_sort_field(name, SortDirection::Ascending, null_order)
71    }
72
73    /// Adds a field for sorting in descending order.
74    pub fn desc(self, name: &str, null_order: NullOrder) -> Self {
75        self.add_sort_field(name, SortDirection::Descending, null_order)
76    }
77
78    fn add_sort_field(
79        mut self,
80        name: &str,
81        sort_direction: SortDirection,
82        null_order: NullOrder,
83    ) -> Self {
84        self.pending_sort_fields.push(PendingSortField {
85            name: name.to_string(),
86            direction: sort_direction,
87            null_order,
88        });
89
90        self
91    }
92}
93
94impl Default for ReplaceSortOrderAction {
95    fn default() -> Self {
96        Self::new()
97    }
98}
99
100#[async_trait]
101impl TransactionAction for ReplaceSortOrderAction {
102    async fn commit(self: Arc<Self>, table: &Table) -> Result<ActionCommit> {
103        let current_schema = table.metadata().current_schema();
104        let sort_fields: Result<Vec<SortField>> = self
105            .pending_sort_fields
106            .iter()
107            .map(|p| p.to_sort_field(current_schema))
108            .collect();
109
110        let bound_sort_order = SortOrder::builder()
111            .with_fields(sort_fields?)
112            .build(current_schema)?;
113
114        let updates = vec![
115            TableUpdate::AddSortOrder {
116                sort_order: bound_sort_order,
117            },
118            TableUpdate::SetDefaultSortOrder { sort_order_id: -1 },
119        ];
120
121        let requirements = vec![
122            TableRequirement::CurrentSchemaIdMatch {
123                current_schema_id: current_schema.schema_id(),
124            },
125            TableRequirement::DefaultSortOrderIdMatch {
126                default_sort_order_id: table.metadata().default_sort_order().order_id,
127            },
128        ];
129
130        Ok(ActionCommit::new(updates, requirements))
131    }
132}
133
134#[cfg(test)]
135mod tests {
136    use as_any::Downcast;
137
138    use crate::spec::{NullOrder, SortDirection};
139    use crate::transaction::sort_order::{PendingSortField, ReplaceSortOrderAction};
140    use crate::transaction::tests::make_v2_table;
141    use crate::transaction::{ApplyTransactionAction, Transaction};
142
143    #[test]
144    fn test_replace_sort_order() {
145        let table = make_v2_table();
146        let tx = Transaction::new(&table);
147        let replace_sort_order = tx.replace_sort_order();
148
149        let tx = replace_sort_order
150            .asc("x", NullOrder::First)
151            .desc("y", NullOrder::Last)
152            .apply(tx)
153            .unwrap();
154
155        let replace_sort_order = (*tx.actions[0])
156            .downcast_ref::<ReplaceSortOrderAction>()
157            .unwrap();
158
159        assert_eq!(replace_sort_order.pending_sort_fields, vec![
160            PendingSortField {
161                name: String::from("x"),
162                direction: SortDirection::Ascending,
163                null_order: NullOrder::First,
164            },
165            PendingSortField {
166                name: String::from("y"),
167                direction: SortDirection::Descending,
168                null_order: NullOrder::Last,
169            }
170        ]);
171    }
172}