1use core::fmt;
22use std::fmt::Formatter;
23use std::sync::Arc;
24
25use serde::{Deserialize, Serialize};
26use typed_builder::TypedBuilder;
27
28use super::transform::Transform;
29use crate::error::Result;
30use crate::spec::Schema;
31use crate::{Error, ErrorKind};
32
33pub type SortOrderRef = Arc<SortOrder>;
35#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Copy, Clone)]
36pub enum SortDirection {
38 #[serde(rename = "asc")]
40 Ascending,
41 #[serde(rename = "desc")]
43 Descending,
44}
45
46impl fmt::Display for SortDirection {
47 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
48 match *self {
49 SortDirection::Ascending => write!(f, "ascending"),
50 SortDirection::Descending => write!(f, "descending"),
51 }
52 }
53}
54
55#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Copy, Clone)]
56pub enum NullOrder {
58 #[serde(rename = "nulls-first")]
59 First,
61 #[serde(rename = "nulls-last")]
62 Last,
64}
65
66impl fmt::Display for NullOrder {
67 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
68 match *self {
69 NullOrder::First => write!(f, "first"),
70 NullOrder::Last => write!(f, "last"),
71 }
72 }
73}
74
75#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone, TypedBuilder)]
76#[serde(rename_all = "kebab-case")]
77pub struct SortField {
79 pub source_id: i32,
81 pub transform: Transform,
83 pub direction: SortDirection,
85 pub null_order: NullOrder,
87}
88
89impl fmt::Display for SortField {
90 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
91 write!(
92 f,
93 "SortField {{ source_id: {}, transform: {}, direction: {}, null_order: {} }}",
94 self.source_id, self.transform, self.direction, self.null_order
95 )
96 }
97}
98
99#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone, Builder, Default)]
100#[serde(rename_all = "kebab-case")]
101#[builder(setter(prefix = "with"))]
102#[builder(build_fn(skip))]
103pub struct SortOrder {
106 #[builder(default)]
108 pub order_id: i64,
109 #[builder(setter(each(name = "with_sort_field")), default)]
111 pub fields: Vec<SortField>,
112}
113
114impl SortOrder {
115 pub(crate) const UNSORTED_ORDER_ID: i64 = 0;
116
117 pub fn builder() -> SortOrderBuilder {
119 SortOrderBuilder::default()
120 }
121
122 pub fn unsorted_order() -> SortOrder {
124 SortOrder {
125 order_id: SortOrder::UNSORTED_ORDER_ID,
126 fields: Vec::new(),
127 }
128 }
129
130 pub fn is_unsorted(&self) -> bool {
134 self.fields.is_empty()
135 }
136
137 pub fn with_order_id(self, order_id: i64) -> SortOrder {
139 SortOrder {
140 order_id,
141 fields: self.fields,
142 }
143 }
144}
145
146impl SortOrderBuilder {
147 pub fn build_unbound(&self) -> Result<SortOrder> {
149 let fields = self.fields.clone().unwrap_or_default();
150 match (self.order_id, fields.as_slice()) {
151 (Some(SortOrder::UNSORTED_ORDER_ID) | None, []) => Ok(SortOrder::unsorted_order()),
152 (_, []) => Err(Error::new(
153 ErrorKind::Unexpected,
154 format!("Unsorted order ID must be {}", SortOrder::UNSORTED_ORDER_ID),
155 )),
156 (Some(SortOrder::UNSORTED_ORDER_ID), [..]) => Err(Error::new(
157 ErrorKind::Unexpected,
158 format!(
159 "Sort order ID {} is reserved for unsorted order",
160 SortOrder::UNSORTED_ORDER_ID
161 ),
162 )),
163 (maybe_order_id, [..]) => Ok(SortOrder {
164 order_id: maybe_order_id.unwrap_or(1),
165 fields: fields.to_vec(),
166 }),
167 }
168 }
169
170 pub fn build(&self, schema: &Schema) -> Result<SortOrder> {
172 let unbound_sort_order = self.build_unbound()?;
173 SortOrderBuilder::check_compatibility(unbound_sort_order, schema)
174 }
175
176 fn check_compatibility(sort_order: SortOrder, schema: &Schema) -> Result<SortOrder> {
178 let sort_fields = &sort_order.fields;
179 for sort_field in sort_fields {
180 match schema.field_by_id(sort_field.source_id) {
181 None => {
182 return Err(Error::new(
183 ErrorKind::Unexpected,
184 format!("Cannot find source column for sort field: {sort_field}"),
185 ));
186 }
187 Some(source_field) => {
188 let source_type = source_field.field_type.as_ref();
189
190 if !source_type.is_primitive() {
191 return Err(Error::new(
192 ErrorKind::Unexpected,
193 format!("Cannot sort by non-primitive source field: {source_type}"),
194 ));
195 }
196
197 let field_transform = sort_field.transform;
198 if field_transform.result_type(source_type).is_err() {
199 return Err(Error::new(
200 ErrorKind::Unexpected,
201 format!(
202 "Invalid source type {source_type} for transform {field_transform}"
203 ),
204 ));
205 }
206 }
207 }
208 }
209
210 Ok(sort_order)
211 }
212}
213
214#[cfg(test)]
215mod tests {
216 use super::*;
217 use crate::spec::{ListType, NestedField, PrimitiveType, Type};
218
219 #[test]
220 fn test_sort_field() {
221 let spec = r#"
222 {
223 "transform": "bucket[4]",
224 "source-id": 3,
225 "direction": "desc",
226 "null-order": "nulls-last"
227 }
228 "#;
229
230 let field: SortField = serde_json::from_str(spec).unwrap();
231 assert_eq!(Transform::Bucket(4), field.transform);
232 assert_eq!(3, field.source_id);
233 assert_eq!(SortDirection::Descending, field.direction);
234 assert_eq!(NullOrder::Last, field.null_order);
235 }
236
237 #[test]
238 fn test_sort_order() {
239 let spec = r#"
240 {
241 "order-id": 1,
242 "fields": [ {
243 "transform": "identity",
244 "source-id": 2,
245 "direction": "asc",
246 "null-order": "nulls-first"
247 }, {
248 "transform": "bucket[4]",
249 "source-id": 3,
250 "direction": "desc",
251 "null-order": "nulls-last"
252 } ]
253 }
254 "#;
255
256 let order: SortOrder = serde_json::from_str(spec).unwrap();
257 assert_eq!(Transform::Identity, order.fields[0].transform);
258 assert_eq!(2, order.fields[0].source_id);
259 assert_eq!(SortDirection::Ascending, order.fields[0].direction);
260 assert_eq!(NullOrder::First, order.fields[0].null_order);
261
262 assert_eq!(Transform::Bucket(4), order.fields[1].transform);
263 assert_eq!(3, order.fields[1].source_id);
264 assert_eq!(SortDirection::Descending, order.fields[1].direction);
265 assert_eq!(NullOrder::Last, order.fields[1].null_order);
266 }
267
268 #[test]
269 fn test_build_unbound_should_return_err_if_unsorted_order_does_not_have_an_order_id_of_zero() {
270 assert_eq!(
271 SortOrder::builder()
272 .with_order_id(1)
273 .build_unbound()
274 .expect_err("Expected an Err value")
275 .message(),
276 "Unsorted order ID must be 0"
277 )
278 }
279
280 #[test]
281 fn test_build_unbound_should_return_err_if_order_id_equals_zero_is_used_for_anything_other_than_unsorted_order()
282 {
283 assert_eq!(
284 SortOrder::builder()
285 .with_order_id(SortOrder::UNSORTED_ORDER_ID)
286 .with_sort_field(
287 SortField::builder()
288 .source_id(2)
289 .direction(SortDirection::Ascending)
290 .null_order(NullOrder::First)
291 .transform(Transform::Identity)
292 .build()
293 )
294 .build_unbound()
295 .expect_err("Expected an Err value")
296 .message(),
297 "Sort order ID 0 is reserved for unsorted order"
298 )
299 }
300
301 #[test]
302 fn test_build_unbound_returns_correct_default_order_id_for_no_fields() {
303 assert_eq!(
304 SortOrder::builder()
305 .build_unbound()
306 .expect("Expected an Ok value")
307 .order_id,
308 SortOrder::UNSORTED_ORDER_ID
309 )
310 }
311
312 #[test]
313 fn test_build_unbound_returns_correct_default_order_id_for_fields() {
314 let sort_field = SortField::builder()
315 .source_id(2)
316 .direction(SortDirection::Ascending)
317 .null_order(NullOrder::First)
318 .transform(Transform::Identity)
319 .build();
320 assert_ne!(
321 SortOrder::builder()
322 .with_sort_field(sort_field.clone())
323 .build_unbound()
324 .expect("Expected an Ok value")
325 .order_id,
326 SortOrder::UNSORTED_ORDER_ID
327 )
328 }
329
330 #[test]
331 fn test_build_unbound_should_return_unsorted_sort_order() {
332 assert_eq!(
333 SortOrder::builder()
334 .with_order_id(SortOrder::UNSORTED_ORDER_ID)
335 .build_unbound()
336 .expect("Expected an Ok value"),
337 SortOrder::unsorted_order()
338 )
339 }
340
341 #[test]
342 fn test_build_unbound_should_return_sort_order_with_given_order_id_and_sort_fields() {
343 let sort_field = SortField::builder()
344 .source_id(2)
345 .direction(SortDirection::Ascending)
346 .null_order(NullOrder::First)
347 .transform(Transform::Identity)
348 .build();
349
350 assert_eq!(
351 SortOrder::builder()
352 .with_order_id(2)
353 .with_sort_field(sort_field.clone())
354 .build_unbound()
355 .expect("Expected an Ok value"),
356 SortOrder {
357 order_id: 2,
358 fields: vec![sort_field]
359 }
360 )
361 }
362
363 #[test]
364 fn test_build_unbound_should_return_sort_order_with_given_sort_fields_and_defaults_to_1_if_missing_an_order_id()
365 {
366 let sort_field = SortField::builder()
367 .source_id(2)
368 .direction(SortDirection::Ascending)
369 .null_order(NullOrder::First)
370 .transform(Transform::Identity)
371 .build();
372
373 assert_eq!(
374 SortOrder::builder()
375 .with_sort_field(sort_field.clone())
376 .build_unbound()
377 .expect("Expected an Ok value"),
378 SortOrder {
379 order_id: 1,
380 fields: vec![sort_field]
381 }
382 )
383 }
384
385 #[test]
386 fn test_build_should_return_err_if_sort_order_field_is_not_present_in_schema() {
387 let schema = Schema::builder()
388 .with_schema_id(1)
389 .with_fields(vec![
390 NestedField::required(1, "foo", Type::Primitive(PrimitiveType::Int)).into(),
391 ])
392 .build()
393 .unwrap();
394
395 let sort_order_builder_result = SortOrder::builder()
396 .with_sort_field(
397 SortField::builder()
398 .source_id(2)
399 .direction(SortDirection::Ascending)
400 .null_order(NullOrder::First)
401 .transform(Transform::Identity)
402 .build(),
403 )
404 .build(&schema);
405
406 assert_eq!(
407 sort_order_builder_result
408 .expect_err("Expected an Err value")
409 .message(),
410 "Cannot find source column for sort field: SortField { source_id: 2, transform: identity, direction: ascending, null_order: first }"
411 )
412 }
413
414 #[test]
415 fn test_build_should_return_err_if_source_field_is_not_a_primitive_type() {
416 let schema = Schema::builder()
417 .with_schema_id(1)
418 .with_fields(vec![
419 NestedField::required(
420 1,
421 "foo",
422 Type::List(ListType {
423 element_field: NestedField::list_element(
424 2,
425 Type::Primitive(PrimitiveType::String),
426 true,
427 )
428 .into(),
429 }),
430 )
431 .into(),
432 ])
433 .build()
434 .unwrap();
435
436 let sort_order_builder_result = SortOrder::builder()
437 .with_sort_field(
438 SortField::builder()
439 .source_id(1)
440 .direction(SortDirection::Ascending)
441 .null_order(NullOrder::First)
442 .transform(Transform::Identity)
443 .build(),
444 )
445 .build(&schema);
446
447 assert_eq!(
448 sort_order_builder_result
449 .expect_err("Expected an Err value")
450 .message(),
451 "Cannot sort by non-primitive source field: list"
452 )
453 }
454
455 #[test]
456 fn test_build_should_return_err_if_source_field_type_is_not_supported_by_transform() {
457 let schema = Schema::builder()
458 .with_schema_id(1)
459 .with_fields(vec![
460 NestedField::required(1, "foo", Type::Primitive(PrimitiveType::Int)).into(),
461 ])
462 .build()
463 .unwrap();
464
465 let sort_order_builder_result = SortOrder::builder()
466 .with_sort_field(
467 SortField::builder()
468 .source_id(1)
469 .direction(SortDirection::Ascending)
470 .null_order(NullOrder::First)
471 .transform(Transform::Year)
472 .build(),
473 )
474 .build(&schema);
475
476 assert_eq!(
477 sort_order_builder_result
478 .expect_err("Expected an Err value")
479 .message(),
480 "Invalid source type int for transform year"
481 )
482 }
483
484 #[test]
485 fn test_build_should_return_valid_sort_order() {
486 let schema = Schema::builder()
487 .with_schema_id(1)
488 .with_fields(vec![
489 NestedField::required(1, "foo", Type::Primitive(PrimitiveType::String)).into(),
490 NestedField::required(2, "bar", Type::Primitive(PrimitiveType::Int)).into(),
491 ])
492 .build()
493 .unwrap();
494
495 let sort_field = SortField::builder()
496 .source_id(2)
497 .direction(SortDirection::Ascending)
498 .null_order(NullOrder::First)
499 .transform(Transform::Identity)
500 .build();
501
502 let sort_order_builder_result = SortOrder::builder()
503 .with_sort_field(sort_field.clone())
504 .build(&schema);
505
506 assert_eq!(
507 sort_order_builder_result.expect("Expected an Ok value"),
508 SortOrder {
509 order_id: 1,
510 fields: vec![sort_field],
511 }
512 )
513 }
514}