1use std::cmp::Ordering;
21use std::fmt::{Display, Formatter};
22use std::str::FromStr;
23
24use fnv::FnvHashSet;
25use serde::{Deserialize, Deserializer, Serialize, Serializer};
26
27use super::values::decimal_utils::decimal_from_i128_with_scale;
28use super::{Datum, PrimitiveLiteral};
29use crate::ErrorKind;
30use crate::error::{Error, Result};
31use crate::expr::{
32 BinaryExpression, BoundPredicate, BoundReference, Predicate, PredicateOperator, Reference,
33 SetExpression, UnaryExpression,
34};
35use crate::spec::Literal;
36use crate::spec::datatypes::{PrimitiveType, Type};
37use crate::transform::{BoxedTransformFunction, create_transform_function};
38
39#[derive(Debug, PartialEq, Eq, Clone, Copy, Hash)]
52pub enum Transform {
53 Identity,
58 Bucket(u32),
76 Truncate(u32),
105 Year,
110 Month,
115 Day,
120 Hour,
125 Void,
134 Unknown,
136}
137
138impl Transform {
139 pub fn to_human_string(&self, field_type: &Type, value: Option<&Literal>) -> String {
141 let Some(value) = value else {
142 return "null".to_string();
143 };
144
145 if let Some(value) = value.as_primitive_literal() {
146 let field_type = field_type.as_primitive_type().unwrap();
147 let datum = Datum::new(field_type.clone(), value);
148
149 match self {
150 Self::Void => "null".to_string(),
151 _ => datum.to_human_string(),
152 }
153 } else {
154 "null".to_string()
155 }
156 }
157
158 pub fn result_type(&self, input_type: &Type) -> Result<Type> {
161 match self {
162 Transform::Identity => {
163 if matches!(input_type, Type::Primitive(_)) {
164 Ok(input_type.clone())
165 } else {
166 Err(Error::new(
167 ErrorKind::DataInvalid,
168 format!("{input_type} is not a valid input type of identity transform",),
169 ))
170 }
171 }
172 Transform::Void => Ok(input_type.clone()),
173 Transform::Unknown => Ok(Type::Primitive(PrimitiveType::String)),
174 Transform::Bucket(_) => {
175 if let Type::Primitive(p) = input_type {
176 match p {
177 PrimitiveType::Int
178 | PrimitiveType::Long
179 | PrimitiveType::Decimal { .. }
180 | PrimitiveType::Date
181 | PrimitiveType::Time
182 | PrimitiveType::Timestamp
183 | PrimitiveType::Timestamptz
184 | PrimitiveType::TimestampNs
185 | PrimitiveType::TimestamptzNs
186 | PrimitiveType::String
187 | PrimitiveType::Uuid
188 | PrimitiveType::Fixed(_)
189 | PrimitiveType::Binary => Ok(Type::Primitive(PrimitiveType::Int)),
190 _ => Err(Error::new(
191 ErrorKind::DataInvalid,
192 format!("{input_type} is not a valid input type of bucket transform",),
193 )),
194 }
195 } else {
196 Err(Error::new(
197 ErrorKind::DataInvalid,
198 format!("{input_type} is not a valid input type of bucket transform",),
199 ))
200 }
201 }
202 Transform::Truncate(_) => {
203 if let Type::Primitive(p) = input_type {
204 match p {
205 PrimitiveType::Int
206 | PrimitiveType::Long
207 | PrimitiveType::String
208 | PrimitiveType::Binary
209 | PrimitiveType::Decimal { .. } => Ok(input_type.clone()),
210 _ => Err(Error::new(
211 ErrorKind::DataInvalid,
212 format!("{input_type} is not a valid input type of truncate transform",),
213 )),
214 }
215 } else {
216 Err(Error::new(
217 ErrorKind::DataInvalid,
218 format!("{input_type} is not a valid input type of truncate transform",),
219 ))
220 }
221 }
222 Transform::Year | Transform::Month => {
223 if let Type::Primitive(p) = input_type {
224 match p {
225 PrimitiveType::Timestamp
226 | PrimitiveType::Timestamptz
227 | PrimitiveType::TimestampNs
228 | PrimitiveType::TimestamptzNs
229 | PrimitiveType::Date => Ok(Type::Primitive(PrimitiveType::Int)),
230 _ => Err(Error::new(
231 ErrorKind::DataInvalid,
232 format!("{input_type} is not a valid input type of {self} transform",),
233 )),
234 }
235 } else {
236 Err(Error::new(
237 ErrorKind::DataInvalid,
238 format!("{input_type} is not a valid input type of {self} transform",),
239 ))
240 }
241 }
242 Transform::Day => {
243 if let Type::Primitive(p) = input_type {
244 match p {
245 PrimitiveType::Timestamp
246 | PrimitiveType::Timestamptz
247 | PrimitiveType::TimestampNs
248 | PrimitiveType::TimestamptzNs
249 | PrimitiveType::Date => Ok(Type::Primitive(PrimitiveType::Date)),
250 _ => Err(Error::new(
251 ErrorKind::DataInvalid,
252 format!("{input_type} is not a valid input type of {self} transform",),
253 )),
254 }
255 } else {
256 Err(Error::new(
257 ErrorKind::DataInvalid,
258 format!("{input_type} is not a valid input type of {self} transform",),
259 ))
260 }
261 }
262 Transform::Hour => {
263 if let Type::Primitive(p) = input_type {
264 match p {
265 PrimitiveType::Timestamp
266 | PrimitiveType::Timestamptz
267 | PrimitiveType::TimestampNs
268 | PrimitiveType::TimestamptzNs => Ok(Type::Primitive(PrimitiveType::Int)),
269 _ => Err(Error::new(
270 ErrorKind::DataInvalid,
271 format!("{input_type} is not a valid input type of {self} transform",),
272 )),
273 }
274 } else {
275 Err(Error::new(
276 ErrorKind::DataInvalid,
277 format!("{input_type} is not a valid input type of {self} transform",),
278 ))
279 }
280 }
281 }
282 }
283
284 pub fn preserves_order(&self) -> bool {
286 !matches!(
287 self,
288 Transform::Void | Transform::Bucket(_) | Transform::Unknown
289 )
290 }
291
292 pub fn dedup_name(&self) -> String {
295 match self {
296 Transform::Year | Transform::Month | Transform::Day | Transform::Hour => {
297 "time".to_string()
298 }
299 _ => format!("{self}"),
300 }
301 }
302
303 pub fn satisfies_order_of(&self, other: &Self) -> bool {
309 match self {
310 Transform::Identity => other.preserves_order(),
311 Transform::Hour => matches!(
312 other,
313 Transform::Hour | Transform::Day | Transform::Month | Transform::Year
314 ),
315 Transform::Day => matches!(other, Transform::Day | Transform::Month | Transform::Year),
316 Transform::Month => matches!(other, Transform::Month | Transform::Year),
317 _ => self == other,
318 }
319 }
320
321 pub fn strict_project(
334 &self,
335 name: &str,
336 predicate: &BoundPredicate,
337 ) -> Result<Option<Predicate>> {
338 let func = create_transform_function(self)?;
339
340 match self {
341 Transform::Identity => match predicate {
342 BoundPredicate::Unary(expr) => Self::project_unary(expr.op(), name),
343 BoundPredicate::Binary(expr) => Ok(Some(Predicate::Binary(BinaryExpression::new(
344 expr.op(),
345 Reference::new(name),
346 expr.literal().to_owned(),
347 )))),
348 BoundPredicate::Set(expr) => Ok(Some(Predicate::Set(SetExpression::new(
349 expr.op(),
350 Reference::new(name),
351 expr.literals().to_owned(),
352 )))),
353 _ => Ok(None),
354 },
355 Transform::Bucket(_) => match predicate {
356 BoundPredicate::Unary(expr) => Self::project_unary(expr.op(), name),
357 BoundPredicate::Binary(expr) => {
358 self.project_binary_expr(name, PredicateOperator::NotEq, expr, &func)
359 }
360 BoundPredicate::Set(expr) => {
361 self.project_set_expr(expr, PredicateOperator::NotIn, name, &func)
362 }
363 _ => Ok(None),
364 },
365 Transform::Truncate(width) => match predicate {
366 BoundPredicate::Unary(expr) => Self::project_unary(expr.op(), name),
367 BoundPredicate::Binary(expr) => {
368 if matches!(
369 expr.term().field().field_type.as_primitive_type(),
370 Some(&PrimitiveType::Int)
371 | Some(&PrimitiveType::Long)
372 | Some(&PrimitiveType::Decimal { .. })
373 ) {
374 self.truncate_number_strict(name, expr, &func)
375 } else if expr.op() == PredicateOperator::StartsWith {
376 let len = match expr.literal().literal() {
377 PrimitiveLiteral::String(s) => s.len(),
378 PrimitiveLiteral::Binary(b) => b.len(),
379 _ => {
380 return Err(Error::new(
381 ErrorKind::DataInvalid,
382 format!(
383 "Expected a string or binary literal, got: {:?}",
384 expr.literal()
385 ),
386 ));
387 }
388 };
389 match len.cmp(&(*width as usize)) {
390 Ordering::Less => Ok(Some(Predicate::Binary(BinaryExpression::new(
391 PredicateOperator::StartsWith,
392 Reference::new(name),
393 expr.literal().to_owned(),
394 )))),
395 Ordering::Equal => Ok(Some(Predicate::Binary(BinaryExpression::new(
396 PredicateOperator::Eq,
397 Reference::new(name),
398 expr.literal().to_owned(),
399 )))),
400 Ordering::Greater => Ok(None),
401 }
402 } else if expr.op() == PredicateOperator::NotStartsWith {
403 let len = match expr.literal().literal() {
404 PrimitiveLiteral::String(s) => s.len(),
405 PrimitiveLiteral::Binary(b) => b.len(),
406 _ => {
407 return Err(Error::new(
408 ErrorKind::DataInvalid,
409 format!(
410 "Expected a string or binary literal, got: {:?}",
411 expr.literal()
412 ),
413 ));
414 }
415 };
416 match len.cmp(&(*width as usize)) {
417 Ordering::Less => Ok(Some(Predicate::Binary(BinaryExpression::new(
418 PredicateOperator::NotStartsWith,
419 Reference::new(name),
420 expr.literal().to_owned(),
421 )))),
422 Ordering::Equal => Ok(Some(Predicate::Binary(BinaryExpression::new(
423 PredicateOperator::NotEq,
424 Reference::new(name),
425 expr.literal().to_owned(),
426 )))),
427 Ordering::Greater => {
428 Ok(Some(Predicate::Binary(BinaryExpression::new(
429 expr.op(),
430 Reference::new(name),
431 func.transform_literal_result(expr.literal())?,
432 ))))
433 }
434 }
435 } else {
436 self.truncate_array_strict(name, expr, &func)
437 }
438 }
439 BoundPredicate::Set(expr) => {
440 self.project_set_expr(expr, PredicateOperator::NotIn, name, &func)
441 }
442 _ => Ok(None),
443 },
444 Transform::Year | Transform::Month | Transform::Day | Transform::Hour => {
445 match predicate {
446 BoundPredicate::Unary(expr) => Self::project_unary(expr.op(), name),
447 BoundPredicate::Binary(expr) => self.truncate_number_strict(name, expr, &func),
448 BoundPredicate::Set(expr) => {
449 self.project_set_expr(expr, PredicateOperator::NotIn, name, &func)
450 }
451 _ => Ok(None),
452 }
453 }
454 _ => Ok(None),
455 }
456 }
457
458 pub fn project(&self, name: &str, predicate: &BoundPredicate) -> Result<Option<Predicate>> {
471 let func = create_transform_function(self)?;
472
473 match self {
474 Transform::Identity => match predicate {
475 BoundPredicate::Unary(expr) => Self::project_unary(expr.op(), name),
476 BoundPredicate::Binary(expr) => Ok(Some(Predicate::Binary(BinaryExpression::new(
477 expr.op(),
478 Reference::new(name),
479 expr.literal().to_owned(),
480 )))),
481 BoundPredicate::Set(expr) => Ok(Some(Predicate::Set(SetExpression::new(
482 expr.op(),
483 Reference::new(name),
484 expr.literals().to_owned(),
485 )))),
486 _ => Ok(None),
487 },
488 Transform::Bucket(_) => match predicate {
489 BoundPredicate::Unary(expr) => Self::project_unary(expr.op(), name),
490 BoundPredicate::Binary(expr) => {
491 self.project_binary_expr(name, PredicateOperator::Eq, expr, &func)
492 }
493 BoundPredicate::Set(expr) => {
494 self.project_set_expr(expr, PredicateOperator::In, name, &func)
495 }
496 _ => Ok(None),
497 },
498 Transform::Truncate(width) => match predicate {
499 BoundPredicate::Unary(expr) => Self::project_unary(expr.op(), name),
500 BoundPredicate::Binary(expr) => {
501 self.project_binary_with_adjusted_boundary(name, expr, &func, Some(*width))
502 }
503 BoundPredicate::Set(expr) => {
504 self.project_set_expr(expr, PredicateOperator::In, name, &func)
505 }
506 _ => Ok(None),
507 },
508 Transform::Year | Transform::Month | Transform::Day | Transform::Hour => {
509 match predicate {
510 BoundPredicate::Unary(expr) => Self::project_unary(expr.op(), name),
511 BoundPredicate::Binary(expr) => {
512 self.project_binary_with_adjusted_boundary(name, expr, &func, None)
513 }
514 BoundPredicate::Set(expr) => {
515 self.project_set_expr(expr, PredicateOperator::In, name, &func)
516 }
517 _ => Ok(None),
518 }
519 }
520 _ => Ok(None),
521 }
522 }
523
524 fn can_transform(&self, datum: &Datum) -> bool {
526 let input_type = datum.data_type().clone();
527 self.result_type(&Type::Primitive(input_type)).is_ok()
528 }
529
530 fn project_unary(op: PredicateOperator, name: &str) -> Result<Option<Predicate>> {
532 Ok(Some(Predicate::Unary(UnaryExpression::new(
533 op,
534 Reference::new(name),
535 ))))
536 }
537
538 fn project_binary_expr(
545 &self,
546 name: &str,
547 op: PredicateOperator,
548 expr: &BinaryExpression<BoundReference>,
549 func: &BoxedTransformFunction,
550 ) -> Result<Option<Predicate>> {
551 if expr.op() != op || !self.can_transform(expr.literal()) {
552 return Ok(None);
553 }
554
555 Ok(Some(Predicate::Binary(BinaryExpression::new(
556 expr.op(),
557 Reference::new(name),
558 func.transform_literal_result(expr.literal())?,
559 ))))
560 }
561
562 fn project_binary_with_adjusted_boundary(
571 &self,
572 name: &str,
573 expr: &BinaryExpression<BoundReference>,
574 func: &BoxedTransformFunction,
575 width: Option<u32>,
576 ) -> Result<Option<Predicate>> {
577 if !self.can_transform(expr.literal()) {
578 return Ok(None);
579 }
580
581 let op = &expr.op();
582 let datum = &expr.literal();
583
584 if let Some(boundary) = Self::adjust_boundary(op, datum)? {
585 let transformed_projection = func.transform_literal_result(&boundary)?;
586
587 let adjusted_projection =
588 self.adjust_time_projection(op, datum, &transformed_projection);
589
590 let adjusted_operator = Self::adjust_operator(op, datum, width);
591
592 if let Some(op) = adjusted_operator {
593 let predicate = match adjusted_projection {
594 None => Predicate::Binary(BinaryExpression::new(
595 op,
596 Reference::new(name),
597 transformed_projection,
598 )),
599 Some(AdjustedProjection::Single(d)) => {
600 Predicate::Binary(BinaryExpression::new(op, Reference::new(name), d))
601 }
602 Some(AdjustedProjection::Set(d)) => Predicate::Set(SetExpression::new(
603 PredicateOperator::In,
604 Reference::new(name),
605 d,
606 )),
607 };
608 return Ok(Some(predicate));
609 }
610 };
611
612 Ok(None)
613 }
614
615 fn project_set_expr(
618 &self,
619 expr: &SetExpression<BoundReference>,
620 op: PredicateOperator,
621 name: &str,
622 func: &BoxedTransformFunction,
623 ) -> Result<Option<Predicate>> {
624 if expr.op() != op || expr.literals().iter().any(|d| !self.can_transform(d)) {
625 return Ok(None);
626 }
627
628 let mut new_set = FnvHashSet::default();
629
630 for lit in expr.literals() {
631 let datum = func.transform_literal_result(lit)?;
632
633 if let Some(AdjustedProjection::Single(d)) =
634 self.adjust_time_projection(&op, lit, &datum)
635 {
636 new_set.insert(d);
637 };
638
639 new_set.insert(datum);
640 }
641
642 Ok(Some(Predicate::Set(SetExpression::new(
643 expr.op(),
644 Reference::new(name),
645 new_set,
646 ))))
647 }
648
649 fn adjust_boundary(op: &PredicateOperator, datum: &Datum) -> Result<Option<Datum>> {
659 let adjusted_boundary = match op {
660 PredicateOperator::LessThan => match (datum.data_type(), datum.literal()) {
661 (PrimitiveType::Int, PrimitiveLiteral::Int(v)) => Some(Datum::int(v - 1)),
662 (PrimitiveType::Long, PrimitiveLiteral::Long(v)) => Some(Datum::long(v - 1)),
663 (PrimitiveType::Decimal { .. }, PrimitiveLiteral::Int128(v)) => {
664 Some(Datum::decimal(decimal_from_i128_with_scale(v - 1, 0))?)
665 }
666 (PrimitiveType::Date, PrimitiveLiteral::Int(v)) => Some(Datum::date(v - 1)),
667 (PrimitiveType::Timestamp, PrimitiveLiteral::Long(v)) => {
668 Some(Datum::timestamp_micros(v - 1))
669 }
670 (PrimitiveType::Timestamptz, PrimitiveLiteral::Long(v)) => {
671 Some(Datum::timestamptz_micros(v - 1))
672 }
673 (PrimitiveType::TimestampNs, PrimitiveLiteral::Long(v)) => {
674 Some(Datum::timestamp_nanos(v - 1))
675 }
676 (PrimitiveType::TimestamptzNs, PrimitiveLiteral::Long(v)) => {
677 Some(Datum::timestamptz_nanos(v - 1))
678 }
679 _ => Some(datum.to_owned()),
680 },
681 PredicateOperator::GreaterThan => match (datum.data_type(), datum.literal()) {
682 (PrimitiveType::Int, PrimitiveLiteral::Int(v)) => Some(Datum::int(v + 1)),
683 (PrimitiveType::Long, PrimitiveLiteral::Long(v)) => Some(Datum::long(v + 1)),
684 (PrimitiveType::Decimal { .. }, PrimitiveLiteral::Int128(v)) => {
685 Some(Datum::decimal(decimal_from_i128_with_scale(v + 1, 0))?)
686 }
687 (PrimitiveType::Date, PrimitiveLiteral::Int(v)) => Some(Datum::date(v + 1)),
688 (PrimitiveType::Timestamp, PrimitiveLiteral::Long(v)) => {
689 Some(Datum::timestamp_micros(v + 1))
690 }
691 (PrimitiveType::Timestamptz, PrimitiveLiteral::Long(v)) => {
692 Some(Datum::timestamptz_micros(v + 1))
693 }
694 (PrimitiveType::TimestampNs, PrimitiveLiteral::Long(v)) => {
695 Some(Datum::timestamp_nanos(v + 1))
696 }
697 (PrimitiveType::TimestamptzNs, PrimitiveLiteral::Long(v)) => {
698 Some(Datum::timestamptz_nanos(v + 1))
699 }
700 _ => Some(datum.to_owned()),
701 },
702 PredicateOperator::Eq
703 | PredicateOperator::LessThanOrEq
704 | PredicateOperator::GreaterThanOrEq
705 | PredicateOperator::StartsWith
706 | PredicateOperator::NotStartsWith => Some(datum.to_owned()),
707 _ => None,
708 };
709
710 Ok(adjusted_boundary)
711 }
712
713 fn adjust_operator(
724 op: &PredicateOperator,
725 datum: &Datum,
726 width: Option<u32>,
727 ) -> Option<PredicateOperator> {
728 match op {
729 PredicateOperator::LessThan => Some(PredicateOperator::LessThanOrEq),
730 PredicateOperator::GreaterThan => Some(PredicateOperator::GreaterThanOrEq),
731 PredicateOperator::StartsWith => match datum.literal() {
732 PrimitiveLiteral::String(s) => {
733 if let Some(w) = width
734 && s.len() == w as usize
735 {
736 return Some(PredicateOperator::Eq);
737 };
738 Some(*op)
739 }
740 _ => Some(*op),
741 },
742 PredicateOperator::NotStartsWith => match datum.literal() {
743 PrimitiveLiteral::String(s) => {
744 if let Some(w) = width {
745 let w = w as usize;
746
747 if s.len() == w {
748 return Some(PredicateOperator::NotEq);
749 }
750
751 if s.len() < w {
752 return Some(*op);
753 }
754
755 return None;
756 };
757 Some(*op)
758 }
759 _ => Some(*op),
760 },
761 _ => Some(*op),
762 }
763 }
764
765 fn adjust_time_projection(
768 &self,
769 op: &PredicateOperator,
770 original: &Datum,
771 transformed: &Datum,
772 ) -> Option<AdjustedProjection> {
773 let should_adjust = match self {
774 Transform::Day => matches!(original.data_type(), PrimitiveType::Timestamp),
775 Transform::Year | Transform::Month => true,
776 _ => false,
777 };
778
779 if should_adjust && let &PrimitiveLiteral::Int(v) = transformed.literal() {
780 match op {
781 PredicateOperator::LessThan
782 | PredicateOperator::LessThanOrEq
783 | PredicateOperator::In => {
784 if v < 0 {
785 match self {
788 Transform::Day => {
789 return Some(AdjustedProjection::Single(Datum::date(v + 1)));
790 }
791 _ => {
792 return Some(AdjustedProjection::Single(Datum::int(v + 1)));
793 }
794 }
795 };
796 }
797 PredicateOperator::Eq => {
798 if v < 0 {
799 let new_set = FnvHashSet::from_iter(vec![
800 transformed.to_owned(),
801 {
804 match self {
805 Transform::Day => Datum::date(v + 1),
806 _ => Datum::int(v + 1),
807 }
808 },
809 ]);
810 return Some(AdjustedProjection::Set(new_set));
811 }
812 }
813 _ => {
814 return None;
815 }
816 }
817 };
818 None
819 }
820
821 #[inline]
824 fn try_increment_number(datum: &Datum) -> Result<Datum> {
825 match (datum.data_type(), datum.literal()) {
826 (PrimitiveType::Int, PrimitiveLiteral::Int(v)) => Ok(Datum::int(v + 1)),
827 (PrimitiveType::Long, PrimitiveLiteral::Long(v)) => Ok(Datum::long(v + 1)),
828 (PrimitiveType::Decimal { .. }, PrimitiveLiteral::Int128(v)) => {
829 Datum::decimal(decimal_from_i128_with_scale(v + 1, 0))
830 }
831 (PrimitiveType::Date, PrimitiveLiteral::Int(v)) => Ok(Datum::date(v + 1)),
832 (PrimitiveType::Timestamp, PrimitiveLiteral::Long(v)) => {
833 Ok(Datum::timestamp_micros(v + 1))
834 }
835 (PrimitiveType::TimestampNs, PrimitiveLiteral::Long(v)) => {
836 Ok(Datum::timestamp_nanos(v + 1))
837 }
838 (PrimitiveType::Timestamptz, PrimitiveLiteral::Long(v)) => {
839 Ok(Datum::timestamptz_micros(v + 1))
840 }
841 (PrimitiveType::TimestamptzNs, PrimitiveLiteral::Long(v)) => {
842 Ok(Datum::timestamptz_nanos(v + 1))
843 }
844 (PrimitiveType::Int, _)
845 | (PrimitiveType::Long, _)
846 | (PrimitiveType::Decimal { .. }, _)
847 | (PrimitiveType::Date, _)
848 | (PrimitiveType::Timestamp, _) => Err(Error::new(
849 ErrorKind::Unexpected,
850 format!(
851 "Unsupported literal increment for type: {:?}",
852 datum.data_type()
853 ),
854 )),
855 _ => Ok(datum.to_owned()),
856 }
857 }
858
859 #[inline]
862 fn try_decrement_number(datum: &Datum) -> Result<Datum> {
863 match (datum.data_type(), datum.literal()) {
864 (PrimitiveType::Int, PrimitiveLiteral::Int(v)) => Ok(Datum::int(v - 1)),
865 (PrimitiveType::Long, PrimitiveLiteral::Long(v)) => Ok(Datum::long(v - 1)),
866 (PrimitiveType::Decimal { .. }, PrimitiveLiteral::Int128(v)) => {
867 Datum::decimal(decimal_from_i128_with_scale(v - 1, 0))
868 }
869 (PrimitiveType::Date, PrimitiveLiteral::Int(v)) => Ok(Datum::date(v - 1)),
870 (PrimitiveType::Timestamp, PrimitiveLiteral::Long(v)) => {
871 Ok(Datum::timestamp_micros(v - 1))
872 }
873 (PrimitiveType::TimestampNs, PrimitiveLiteral::Long(v)) => {
874 Ok(Datum::timestamp_nanos(v - 1))
875 }
876 (PrimitiveType::Timestamptz, PrimitiveLiteral::Long(v)) => {
877 Ok(Datum::timestamptz_micros(v - 1))
878 }
879 (PrimitiveType::TimestamptzNs, PrimitiveLiteral::Long(v)) => {
880 Ok(Datum::timestamptz_nanos(v - 1))
881 }
882 (PrimitiveType::Int, _)
883 | (PrimitiveType::Long, _)
884 | (PrimitiveType::Decimal { .. }, _)
885 | (PrimitiveType::Date, _)
886 | (PrimitiveType::Timestamp, _) => Err(Error::new(
887 ErrorKind::Unexpected,
888 format!(
889 "Unsupported literal decrement for type: {:?}",
890 datum.data_type()
891 ),
892 )),
893 _ => Ok(datum.to_owned()),
894 }
895 }
896
897 fn truncate_number_strict(
898 &self,
899 name: &str,
900 expr: &BinaryExpression<BoundReference>,
901 func: &BoxedTransformFunction,
902 ) -> Result<Option<Predicate>> {
903 let boundary = expr.literal();
904
905 if !matches!(
906 boundary.data_type(),
907 &PrimitiveType::Int
908 | &PrimitiveType::Long
909 | &PrimitiveType::Decimal { .. }
910 | &PrimitiveType::Date
911 | &PrimitiveType::Timestamp
912 | &PrimitiveType::Timestamptz
913 | &PrimitiveType::TimestampNs
914 | &PrimitiveType::TimestamptzNs
915 ) {
916 return Err(Error::new(
917 ErrorKind::DataInvalid,
918 format!("Expected a numeric literal, got: {boundary:?}"),
919 ));
920 }
921
922 let predicate = match expr.op() {
923 PredicateOperator::LessThan => Some(Predicate::Binary(BinaryExpression::new(
924 PredicateOperator::LessThan,
925 Reference::new(name),
926 func.transform_literal_result(boundary)?,
927 ))),
928 PredicateOperator::LessThanOrEq => Some(Predicate::Binary(BinaryExpression::new(
929 PredicateOperator::LessThan,
930 Reference::new(name),
931 func.transform_literal_result(&Self::try_increment_number(boundary)?)?,
932 ))),
933 PredicateOperator::GreaterThan => Some(Predicate::Binary(BinaryExpression::new(
934 PredicateOperator::GreaterThan,
935 Reference::new(name),
936 func.transform_literal_result(boundary)?,
937 ))),
938 PredicateOperator::GreaterThanOrEq => Some(Predicate::Binary(BinaryExpression::new(
939 PredicateOperator::GreaterThan,
940 Reference::new(name),
941 func.transform_literal_result(&Self::try_decrement_number(boundary)?)?,
942 ))),
943 PredicateOperator::NotEq => Some(Predicate::Binary(BinaryExpression::new(
944 PredicateOperator::NotEq,
945 Reference::new(name),
946 func.transform_literal_result(boundary)?,
947 ))),
948 _ => None,
949 };
950
951 Ok(predicate)
952 }
953
954 fn truncate_array_strict(
955 &self,
956 name: &str,
957 expr: &BinaryExpression<BoundReference>,
958 func: &BoxedTransformFunction,
959 ) -> Result<Option<Predicate>> {
960 let boundary = expr.literal();
961
962 match expr.op() {
963 PredicateOperator::LessThan | PredicateOperator::LessThanOrEq => {
964 Ok(Some(Predicate::Binary(BinaryExpression::new(
965 PredicateOperator::LessThan,
966 Reference::new(name),
967 func.transform_literal_result(boundary)?,
968 ))))
969 }
970 PredicateOperator::GreaterThan | PredicateOperator::GreaterThanOrEq => {
971 Ok(Some(Predicate::Binary(BinaryExpression::new(
972 PredicateOperator::GreaterThan,
973 Reference::new(name),
974 func.transform_literal_result(boundary)?,
975 ))))
976 }
977 PredicateOperator::NotEq => Ok(Some(Predicate::Binary(BinaryExpression::new(
978 PredicateOperator::NotEq,
979 Reference::new(name),
980 func.transform_literal_result(boundary)?,
981 )))),
982 _ => Ok(None),
983 }
984 }
985}
986
987impl Display for Transform {
988 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
989 match self {
990 Transform::Identity => write!(f, "identity"),
991 Transform::Year => write!(f, "year"),
992 Transform::Month => write!(f, "month"),
993 Transform::Day => write!(f, "day"),
994 Transform::Hour => write!(f, "hour"),
995 Transform::Void => write!(f, "void"),
996 Transform::Bucket(length) => write!(f, "bucket[{length}]"),
997 Transform::Truncate(width) => write!(f, "truncate[{width}]"),
998 Transform::Unknown => write!(f, "unknown"),
999 }
1000 }
1001}
1002
1003impl FromStr for Transform {
1004 type Err = Error;
1005
1006 fn from_str(s: &str) -> Result<Self> {
1007 let t = match s {
1008 "identity" => Transform::Identity,
1009 "year" => Transform::Year,
1010 "month" => Transform::Month,
1011 "day" => Transform::Day,
1012 "hour" => Transform::Hour,
1013 "void" => Transform::Void,
1014 "unknown" => Transform::Unknown,
1015 v if v.starts_with("bucket") => {
1016 let length = v
1017 .strip_prefix("bucket")
1018 .expect("transform must starts with `bucket`")
1019 .trim_start_matches('[')
1020 .trim_end_matches(']')
1021 .parse()
1022 .map_err(|err| {
1023 Error::new(
1024 ErrorKind::DataInvalid,
1025 format!("transform bucket type {v:?} is invalid"),
1026 )
1027 .with_source(err)
1028 })?;
1029
1030 Transform::Bucket(length)
1031 }
1032 v if v.starts_with("truncate") => {
1033 let width = v
1034 .strip_prefix("truncate")
1035 .expect("transform must starts with `truncate`")
1036 .trim_start_matches('[')
1037 .trim_end_matches(']')
1038 .parse()
1039 .map_err(|err| {
1040 Error::new(
1041 ErrorKind::DataInvalid,
1042 format!("transform truncate type {v:?} is invalid"),
1043 )
1044 .with_source(err)
1045 })?;
1046
1047 Transform::Truncate(width)
1048 }
1049 v => {
1050 return Err(Error::new(
1051 ErrorKind::DataInvalid,
1052 format!("transform {v:?} is invalid"),
1053 ));
1054 }
1055 };
1056
1057 Ok(t)
1058 }
1059}
1060
1061impl Serialize for Transform {
1062 fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
1063 where S: Serializer {
1064 serializer.serialize_str(format!("{self}").as_str())
1065 }
1066}
1067
1068impl<'de> Deserialize<'de> for Transform {
1069 fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
1070 where D: Deserializer<'de> {
1071 let s = String::deserialize(deserializer)?;
1072 s.parse().map_err(<D::Error as serde::de::Error>::custom)
1073 }
1074}
1075
1076#[derive(Debug)]
1079enum AdjustedProjection {
1080 Single(Datum),
1081 Set(FnvHashSet<Datum>),
1082}
1083
1084#[cfg(test)]
1085mod tests {
1086 use super::*;
1087
1088 fn check_boundary(op: PredicateOperator, input: Datum, expected: Datum) {
1089 let result = Transform::adjust_boundary(&op, &input).unwrap().unwrap();
1090 assert_eq!(result, expected);
1091 }
1092
1093 #[test]
1094 fn test_adjust_boundary_timestamp_types() {
1095 for (datum, dec, inc) in [
1096 (
1097 Datum::timestamptz_micros(1000),
1098 Datum::timestamptz_micros(999),
1099 Datum::timestamptz_micros(1001),
1100 ),
1101 (
1102 Datum::timestamp_nanos(5000),
1103 Datum::timestamp_nanos(4999),
1104 Datum::timestamp_nanos(5001),
1105 ),
1106 (
1107 Datum::timestamptz_nanos(5000),
1108 Datum::timestamptz_nanos(4999),
1109 Datum::timestamptz_nanos(5001),
1110 ),
1111 ] {
1112 check_boundary(PredicateOperator::LessThan, datum.clone(), dec);
1113 check_boundary(PredicateOperator::GreaterThan, datum.clone(), inc);
1114 check_boundary(
1115 PredicateOperator::LessThanOrEq,
1116 datum.clone(),
1117 datum.clone(),
1118 );
1119 check_boundary(PredicateOperator::GreaterThanOrEq, datum.clone(), datum);
1120 }
1121 }
1122}