use arrow_array::ArrayRef;
use crate::spec::{Datum, Transform};
use crate::{Error, ErrorKind, Result};
mod bucket;
mod identity;
mod temporal;
mod truncate;
mod void;
pub trait TransformFunction: Send + Sync {
fn transform(&self, input: ArrayRef) -> Result<ArrayRef>;
fn transform_literal(&self, input: &Datum) -> Result<Option<Datum>>;
fn transform_literal_result(&self, input: &Datum) -> Result<Datum> {
self.transform_literal(input)?.ok_or_else(|| {
Error::new(
ErrorKind::Unexpected,
format!("Returns 'None' for literal {}", input),
)
})
}
}
pub type BoxedTransformFunction = Box<dyn TransformFunction>;
pub fn create_transform_function(transform: &Transform) -> Result<BoxedTransformFunction> {
match transform {
Transform::Identity => Ok(Box::new(identity::Identity {})),
Transform::Void => Ok(Box::new(void::Void {})),
Transform::Year => Ok(Box::new(temporal::Year {})),
Transform::Month => Ok(Box::new(temporal::Month {})),
Transform::Day => Ok(Box::new(temporal::Day {})),
Transform::Hour => Ok(Box::new(temporal::Hour {})),
Transform::Bucket(mod_n) => Ok(Box::new(bucket::Bucket::new(*mod_n))),
Transform::Truncate(width) => Ok(Box::new(truncate::Truncate::new(*width))),
Transform::Unknown => Err(crate::error::Error::new(
crate::ErrorKind::FeatureUnsupported,
"Transform Unknown is not implemented",
)),
}
}
#[cfg(test)]
mod test {
use std::collections::HashSet;
use std::sync::Arc;
use crate::expr::accessor::StructAccessor;
use crate::expr::{
BinaryExpression, BoundPredicate, BoundReference, PredicateOperator, SetExpression,
};
use crate::spec::{Datum, NestedField, NestedFieldRef, PrimitiveType, Transform, Type};
use crate::Result;
pub(crate) struct TestProjectionFixture {
transform: Transform,
name: String,
field: NestedFieldRef,
}
impl TestProjectionFixture {
pub(crate) fn new(
transform: Transform,
name: impl Into<String>,
field: NestedField,
) -> Self {
TestProjectionFixture {
transform,
name: name.into(),
field: Arc::new(field),
}
}
pub(crate) fn binary_predicate(
&self,
op: PredicateOperator,
literal: Datum,
) -> BoundPredicate {
BoundPredicate::Binary(BinaryExpression::new(
op,
BoundReference::new(
self.name.clone(),
self.field.clone(),
Arc::new(StructAccessor::new(1, PrimitiveType::Boolean)),
),
literal,
))
}
pub(crate) fn set_predicate(
&self,
op: PredicateOperator,
literals: Vec<Datum>,
) -> BoundPredicate {
BoundPredicate::Set(SetExpression::new(
op,
BoundReference::new(
self.name.clone(),
self.field.clone(),
Arc::new(StructAccessor::new(1, PrimitiveType::Boolean)),
),
HashSet::from_iter(literals),
))
}
pub(crate) fn assert_projection(
&self,
predicate: &BoundPredicate,
expected: Option<&str>,
) -> Result<()> {
let result = self.transform.project(&self.name, predicate)?;
match expected {
Some(exp) => assert_eq!(format!("{}", result.unwrap()), exp),
None => assert!(result.is_none()),
}
Ok(())
}
}
pub(crate) struct TestTransformFixture {
pub display: String,
pub json: String,
pub dedup_name: String,
pub preserves_order: bool,
pub satisfies_order_of: Vec<(Transform, bool)>,
pub trans_types: Vec<(Type, Option<Type>)>,
}
impl TestTransformFixture {
pub(crate) fn assert_transform(&self, trans: Transform) {
assert_eq!(self.display, format!("{trans}"));
assert_eq!(self.json, serde_json::to_string(&trans).unwrap());
assert_eq!(trans, serde_json::from_str(self.json.as_str()).unwrap());
assert_eq!(self.dedup_name, trans.dedup_name());
assert_eq!(self.preserves_order, trans.preserves_order());
for (other_trans, satisfies_order_of) in &self.satisfies_order_of {
assert_eq!(
satisfies_order_of,
&trans.satisfies_order_of(other_trans),
"Failed to check satisfies order {}, {}, {}",
trans,
other_trans,
satisfies_order_of
);
}
for (input_type, result_type) in &self.trans_types {
assert_eq!(result_type, &trans.result_type(input_type).ok());
}
}
}
}