iceberg/spec/
transform.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Transforms in iceberg.
19
20use std::cmp::Ordering;
21use std::fmt::{Display, Formatter};
22use std::str::FromStr;
23
24use fnv::FnvHashSet;
25use serde::{Deserialize, Deserializer, Serialize, Serializer};
26
27use super::values::decimal_utils::decimal_from_i128_with_scale;
28use super::{Datum, PrimitiveLiteral};
29use crate::ErrorKind;
30use crate::error::{Error, Result};
31use crate::expr::{
32    BinaryExpression, BoundPredicate, BoundReference, Predicate, PredicateOperator, Reference,
33    SetExpression, UnaryExpression,
34};
35use crate::spec::Literal;
36use crate::spec::datatypes::{PrimitiveType, Type};
37use crate::transform::{BoxedTransformFunction, create_transform_function};
38
39/// Transform is used to transform predicates to partition predicates,
40/// in addition to transforming data values.
41///
42/// Deriving partition predicates from column predicates on the table data
43/// is used to separate the logical queries from physical storage: the
44/// partitioning can change and the correct partition filters are always
45/// derived from column predicates.
46///
47/// This simplifies queries because users don’t have to supply both logical
48/// predicates and partition predicates.
49///
50/// All transforms must return `null` for a `null` input value.
51#[derive(Debug, PartialEq, Eq, Clone, Copy, Hash)]
52pub enum Transform {
53    /// Source value, unmodified
54    ///
55    /// - Source type could be any type.
56    /// - Return type is the same with source type.
57    Identity,
58    /// Hash of value, mod `N`.
59    ///
60    /// Bucket partition transforms use a 32-bit hash of the source value.
61    /// The 32-bit hash implementation is the 32-bit Murmur3 hash, x86
62    /// variant, seeded with 0.
63    ///
64    /// Transforms are parameterized by a number of buckets, N. The hash mod
65    /// N must produce a positive value by first discarding the sign bit of
66    /// the hash value. In pseudo-code, the function is:
67    ///
68    /// ```text
69    /// def bucket_N(x) = (murmur3_x86_32_hash(x) & Integer.MAX_VALUE) % N
70    /// ```
71    ///
72    /// - Source type could be `int`, `long`, `decimal`, `date`, `time`,
73    ///   `timestamp`, `timestamptz`, `string`, `uuid`, `fixed`, `binary`.
74    /// - Return type is `int`.
75    Bucket(u32),
76    /// Value truncated to width `W`
77    ///
78    /// For `int`:
79    ///
80    /// - `v - (v % W)` remainders must be positive
81    /// - example: W=10: 1 → 0, -1 → -10
82    /// - note: The remainder, v % W, must be positive.
83    ///
84    /// For `long`:
85    ///
86    /// - `v - (v % W)` remainders must be positive
87    /// - example: W=10: 1 → 0, -1 → -10
88    /// - note: The remainder, v % W, must be positive.
89    ///
90    /// For `decimal`:
91    ///
92    /// - `scaled_W = decimal(W, scale(v)) v - (v % scaled_W)`
93    /// - example: W=50, s=2: 10.65 → 10.50
94    ///
95    /// For `string`:
96    ///
97    /// - Substring of length L: `v.substring(0, L)`
98    /// - example: L=3: iceberg → ice
99    /// - note: Strings are truncated to a valid UTF-8 string with no more
100    ///   than L code points.
101    ///
102    /// - Source type could be `int`, `long`, `decimal`, `string`
103    /// - Return type is the same with source type.
104    Truncate(u32),
105    /// Extract a date or timestamp year, as years from 1970
106    ///
107    /// - Source type could be `date`, `timestamp`, `timestamptz`
108    /// - Return type is `int`
109    Year,
110    /// Extract a date or timestamp month, as months from 1970-01-01
111    ///
112    /// - Source type could be `date`, `timestamp`, `timestamptz`
113    /// - Return type is `int`
114    Month,
115    /// Extract a date or timestamp day, as days from 1970-01-01
116    ///
117    /// - Source type could be `date`, `timestamp`, `timestamptz`
118    /// - Return type is `int`
119    Day,
120    /// Extract a timestamp hour, as hours from 1970-01-01 00:00:00
121    ///
122    /// - Source type could be `timestamp`, `timestamptz`
123    /// - Return type is `int`
124    Hour,
125    /// Always produces `null`
126    ///
127    /// The void transform may be used to replace the transform in an
128    /// existing partition field so that the field is effectively dropped in
129    /// v1 tables.
130    ///
131    /// - Source type could be any type..
132    /// - Return type is Source type.
133    Void,
134    /// Used to represent some customized transform that can't be recognized or supported now.
135    Unknown,
136}
137
138impl Transform {
139    /// Returns a human-readable String representation of a transformed value.
140    pub fn to_human_string(&self, field_type: &Type, value: Option<&Literal>) -> String {
141        let Some(value) = value else {
142            return "null".to_string();
143        };
144
145        if let Some(value) = value.as_primitive_literal() {
146            let field_type = field_type.as_primitive_type().unwrap();
147            let datum = Datum::new(field_type.clone(), value);
148
149            match self {
150                Self::Void => "null".to_string(),
151                _ => datum.to_human_string(),
152            }
153        } else {
154            "null".to_string()
155        }
156    }
157
158    /// Get the return type of transform given the input type.
159    /// Returns `None` if it can't be transformed.
160    pub fn result_type(&self, input_type: &Type) -> Result<Type> {
161        match self {
162            Transform::Identity => {
163                if matches!(input_type, Type::Primitive(_)) {
164                    Ok(input_type.clone())
165                } else {
166                    Err(Error::new(
167                        ErrorKind::DataInvalid,
168                        format!("{input_type} is not a valid input type of identity transform",),
169                    ))
170                }
171            }
172            Transform::Void => Ok(input_type.clone()),
173            Transform::Unknown => Ok(Type::Primitive(PrimitiveType::String)),
174            Transform::Bucket(_) => {
175                if let Type::Primitive(p) = input_type {
176                    match p {
177                        PrimitiveType::Int
178                        | PrimitiveType::Long
179                        | PrimitiveType::Decimal { .. }
180                        | PrimitiveType::Date
181                        | PrimitiveType::Time
182                        | PrimitiveType::Timestamp
183                        | PrimitiveType::Timestamptz
184                        | PrimitiveType::TimestampNs
185                        | PrimitiveType::TimestamptzNs
186                        | PrimitiveType::String
187                        | PrimitiveType::Uuid
188                        | PrimitiveType::Fixed(_)
189                        | PrimitiveType::Binary => Ok(Type::Primitive(PrimitiveType::Int)),
190                        _ => Err(Error::new(
191                            ErrorKind::DataInvalid,
192                            format!("{input_type} is not a valid input type of bucket transform",),
193                        )),
194                    }
195                } else {
196                    Err(Error::new(
197                        ErrorKind::DataInvalid,
198                        format!("{input_type} is not a valid input type of bucket transform",),
199                    ))
200                }
201            }
202            Transform::Truncate(_) => {
203                if let Type::Primitive(p) = input_type {
204                    match p {
205                        PrimitiveType::Int
206                        | PrimitiveType::Long
207                        | PrimitiveType::String
208                        | PrimitiveType::Binary
209                        | PrimitiveType::Decimal { .. } => Ok(input_type.clone()),
210                        _ => Err(Error::new(
211                            ErrorKind::DataInvalid,
212                            format!("{input_type} is not a valid input type of truncate transform",),
213                        )),
214                    }
215                } else {
216                    Err(Error::new(
217                        ErrorKind::DataInvalid,
218                        format!("{input_type} is not a valid input type of truncate transform",),
219                    ))
220                }
221            }
222            Transform::Year | Transform::Month => {
223                if let Type::Primitive(p) = input_type {
224                    match p {
225                        PrimitiveType::Timestamp
226                        | PrimitiveType::Timestamptz
227                        | PrimitiveType::TimestampNs
228                        | PrimitiveType::TimestamptzNs
229                        | PrimitiveType::Date => Ok(Type::Primitive(PrimitiveType::Int)),
230                        _ => Err(Error::new(
231                            ErrorKind::DataInvalid,
232                            format!("{input_type} is not a valid input type of {self} transform",),
233                        )),
234                    }
235                } else {
236                    Err(Error::new(
237                        ErrorKind::DataInvalid,
238                        format!("{input_type} is not a valid input type of {self} transform",),
239                    ))
240                }
241            }
242            Transform::Day => {
243                if let Type::Primitive(p) = input_type {
244                    match p {
245                        PrimitiveType::Timestamp
246                        | PrimitiveType::Timestamptz
247                        | PrimitiveType::TimestampNs
248                        | PrimitiveType::TimestamptzNs
249                        | PrimitiveType::Date => Ok(Type::Primitive(PrimitiveType::Date)),
250                        _ => Err(Error::new(
251                            ErrorKind::DataInvalid,
252                            format!("{input_type} is not a valid input type of {self} transform",),
253                        )),
254                    }
255                } else {
256                    Err(Error::new(
257                        ErrorKind::DataInvalid,
258                        format!("{input_type} is not a valid input type of {self} transform",),
259                    ))
260                }
261            }
262            Transform::Hour => {
263                if let Type::Primitive(p) = input_type {
264                    match p {
265                        PrimitiveType::Timestamp
266                        | PrimitiveType::Timestamptz
267                        | PrimitiveType::TimestampNs
268                        | PrimitiveType::TimestamptzNs => Ok(Type::Primitive(PrimitiveType::Int)),
269                        _ => Err(Error::new(
270                            ErrorKind::DataInvalid,
271                            format!("{input_type} is not a valid input type of {self} transform",),
272                        )),
273                    }
274                } else {
275                    Err(Error::new(
276                        ErrorKind::DataInvalid,
277                        format!("{input_type} is not a valid input type of {self} transform",),
278                    ))
279                }
280            }
281        }
282    }
283
284    /// Whether the transform preserves the order of values.
285    pub fn preserves_order(&self) -> bool {
286        !matches!(
287            self,
288            Transform::Void | Transform::Bucket(_) | Transform::Unknown
289        )
290    }
291
292    /// Return the unique transform name to check if similar transforms for the same source field
293    /// are added multiple times in partition spec builder.
294    pub fn dedup_name(&self) -> String {
295        match self {
296            Transform::Year | Transform::Month | Transform::Day | Transform::Hour => {
297                "time".to_string()
298            }
299            _ => format!("{self}"),
300        }
301    }
302
303    /// Whether ordering by this transform's result satisfies the ordering of another transform's
304    /// result.
305    ///
306    /// For example, sorting by day(ts) will produce an ordering that is also by month(ts) or
307    ///  year(ts). However, sorting by day(ts) will not satisfy the order of hour(ts) or identity(ts).
308    pub fn satisfies_order_of(&self, other: &Self) -> bool {
309        match self {
310            Transform::Identity => other.preserves_order(),
311            Transform::Hour => matches!(
312                other,
313                Transform::Hour | Transform::Day | Transform::Month | Transform::Year
314            ),
315            Transform::Day => matches!(other, Transform::Day | Transform::Month | Transform::Year),
316            Transform::Month => matches!(other, Transform::Month | Transform::Year),
317            _ => self == other,
318        }
319    }
320
321    /// Strictly projects a given predicate according to the transformation
322    /// specified by the `Transform` instance.
323    ///
324    /// This method ensures that the projected predicate is strictly aligned
325    /// with the transformation logic, providing a more precise filtering
326    /// mechanism for transformed data.
327    ///
328    /// # Example
329    /// Suppose, we have row filter `a = 10`, and a partition spec
330    /// `bucket(a, 37) as bs`, if one row matches `a = 10`, then its partition
331    /// value should match `bucket(10, 37) as bs`, and we project `a = 10` to
332    /// `bs = bucket(10, 37)`
333    pub fn strict_project(
334        &self,
335        name: &str,
336        predicate: &BoundPredicate,
337    ) -> Result<Option<Predicate>> {
338        let func = create_transform_function(self)?;
339
340        match self {
341            Transform::Identity => match predicate {
342                BoundPredicate::Unary(expr) => Self::project_unary(expr.op(), name),
343                BoundPredicate::Binary(expr) => Ok(Some(Predicate::Binary(BinaryExpression::new(
344                    expr.op(),
345                    Reference::new(name),
346                    expr.literal().to_owned(),
347                )))),
348                BoundPredicate::Set(expr) => Ok(Some(Predicate::Set(SetExpression::new(
349                    expr.op(),
350                    Reference::new(name),
351                    expr.literals().to_owned(),
352                )))),
353                _ => Ok(None),
354            },
355            Transform::Bucket(_) => match predicate {
356                BoundPredicate::Unary(expr) => Self::project_unary(expr.op(), name),
357                BoundPredicate::Binary(expr) => {
358                    self.project_binary_expr(name, PredicateOperator::NotEq, expr, &func)
359                }
360                BoundPredicate::Set(expr) => {
361                    self.project_set_expr(expr, PredicateOperator::NotIn, name, &func)
362                }
363                _ => Ok(None),
364            },
365            Transform::Truncate(width) => match predicate {
366                BoundPredicate::Unary(expr) => Self::project_unary(expr.op(), name),
367                BoundPredicate::Binary(expr) => {
368                    if matches!(
369                        expr.term().field().field_type.as_primitive_type(),
370                        Some(&PrimitiveType::Int)
371                            | Some(&PrimitiveType::Long)
372                            | Some(&PrimitiveType::Decimal { .. })
373                    ) {
374                        self.truncate_number_strict(name, expr, &func)
375                    } else if expr.op() == PredicateOperator::StartsWith {
376                        let len = match expr.literal().literal() {
377                            PrimitiveLiteral::String(s) => s.len(),
378                            PrimitiveLiteral::Binary(b) => b.len(),
379                            _ => {
380                                return Err(Error::new(
381                                    ErrorKind::DataInvalid,
382                                    format!(
383                                        "Expected a string or binary literal, got: {:?}",
384                                        expr.literal()
385                                    ),
386                                ));
387                            }
388                        };
389                        match len.cmp(&(*width as usize)) {
390                            Ordering::Less => Ok(Some(Predicate::Binary(BinaryExpression::new(
391                                PredicateOperator::StartsWith,
392                                Reference::new(name),
393                                expr.literal().to_owned(),
394                            )))),
395                            Ordering::Equal => Ok(Some(Predicate::Binary(BinaryExpression::new(
396                                PredicateOperator::Eq,
397                                Reference::new(name),
398                                expr.literal().to_owned(),
399                            )))),
400                            Ordering::Greater => Ok(None),
401                        }
402                    } else if expr.op() == PredicateOperator::NotStartsWith {
403                        let len = match expr.literal().literal() {
404                            PrimitiveLiteral::String(s) => s.len(),
405                            PrimitiveLiteral::Binary(b) => b.len(),
406                            _ => {
407                                return Err(Error::new(
408                                    ErrorKind::DataInvalid,
409                                    format!(
410                                        "Expected a string or binary literal, got: {:?}",
411                                        expr.literal()
412                                    ),
413                                ));
414                            }
415                        };
416                        match len.cmp(&(*width as usize)) {
417                            Ordering::Less => Ok(Some(Predicate::Binary(BinaryExpression::new(
418                                PredicateOperator::NotStartsWith,
419                                Reference::new(name),
420                                expr.literal().to_owned(),
421                            )))),
422                            Ordering::Equal => Ok(Some(Predicate::Binary(BinaryExpression::new(
423                                PredicateOperator::NotEq,
424                                Reference::new(name),
425                                expr.literal().to_owned(),
426                            )))),
427                            Ordering::Greater => {
428                                Ok(Some(Predicate::Binary(BinaryExpression::new(
429                                    expr.op(),
430                                    Reference::new(name),
431                                    func.transform_literal_result(expr.literal())?,
432                                ))))
433                            }
434                        }
435                    } else {
436                        self.truncate_array_strict(name, expr, &func)
437                    }
438                }
439                BoundPredicate::Set(expr) => {
440                    self.project_set_expr(expr, PredicateOperator::NotIn, name, &func)
441                }
442                _ => Ok(None),
443            },
444            Transform::Year | Transform::Month | Transform::Day | Transform::Hour => {
445                match predicate {
446                    BoundPredicate::Unary(expr) => Self::project_unary(expr.op(), name),
447                    BoundPredicate::Binary(expr) => self.truncate_number_strict(name, expr, &func),
448                    BoundPredicate::Set(expr) => {
449                        self.project_set_expr(expr, PredicateOperator::NotIn, name, &func)
450                    }
451                    _ => Ok(None),
452                }
453            }
454            _ => Ok(None),
455        }
456    }
457
458    /// Projects a given predicate according to the transformation
459    /// specified by the `Transform` instance.
460    ///
461    /// This allows predicates to be effectively applied to data
462    /// that has undergone transformation, enabling efficient querying
463    /// and filtering based on the original, untransformed data.
464    ///
465    /// # Example
466    /// Suppose, we have row filter `a = 10`, and a partition spec
467    /// `bucket(a, 37) as bs`, if one row matches `a = 10`, then its partition
468    /// value should match `bucket(10, 37) as bs`, and we project `a = 10` to
469    /// `bs = bucket(10, 37)`
470    pub fn project(&self, name: &str, predicate: &BoundPredicate) -> Result<Option<Predicate>> {
471        let func = create_transform_function(self)?;
472
473        match self {
474            Transform::Identity => match predicate {
475                BoundPredicate::Unary(expr) => Self::project_unary(expr.op(), name),
476                BoundPredicate::Binary(expr) => Ok(Some(Predicate::Binary(BinaryExpression::new(
477                    expr.op(),
478                    Reference::new(name),
479                    expr.literal().to_owned(),
480                )))),
481                BoundPredicate::Set(expr) => Ok(Some(Predicate::Set(SetExpression::new(
482                    expr.op(),
483                    Reference::new(name),
484                    expr.literals().to_owned(),
485                )))),
486                _ => Ok(None),
487            },
488            Transform::Bucket(_) => match predicate {
489                BoundPredicate::Unary(expr) => Self::project_unary(expr.op(), name),
490                BoundPredicate::Binary(expr) => {
491                    self.project_binary_expr(name, PredicateOperator::Eq, expr, &func)
492                }
493                BoundPredicate::Set(expr) => {
494                    self.project_set_expr(expr, PredicateOperator::In, name, &func)
495                }
496                _ => Ok(None),
497            },
498            Transform::Truncate(width) => match predicate {
499                BoundPredicate::Unary(expr) => Self::project_unary(expr.op(), name),
500                BoundPredicate::Binary(expr) => {
501                    self.project_binary_with_adjusted_boundary(name, expr, &func, Some(*width))
502                }
503                BoundPredicate::Set(expr) => {
504                    self.project_set_expr(expr, PredicateOperator::In, name, &func)
505                }
506                _ => Ok(None),
507            },
508            Transform::Year | Transform::Month | Transform::Day | Transform::Hour => {
509                match predicate {
510                    BoundPredicate::Unary(expr) => Self::project_unary(expr.op(), name),
511                    BoundPredicate::Binary(expr) => {
512                        self.project_binary_with_adjusted_boundary(name, expr, &func, None)
513                    }
514                    BoundPredicate::Set(expr) => {
515                        self.project_set_expr(expr, PredicateOperator::In, name, &func)
516                    }
517                    _ => Ok(None),
518                }
519            }
520            _ => Ok(None),
521        }
522    }
523
524    /// Check if `Transform` is applicable on datum's `PrimitiveType`
525    fn can_transform(&self, datum: &Datum) -> bool {
526        let input_type = datum.data_type().clone();
527        self.result_type(&Type::Primitive(input_type)).is_ok()
528    }
529
530    /// Creates a unary predicate from a given operator and a reference name.
531    fn project_unary(op: PredicateOperator, name: &str) -> Result<Option<Predicate>> {
532        Ok(Some(Predicate::Unary(UnaryExpression::new(
533            op,
534            Reference::new(name),
535        ))))
536    }
537
538    /// Attempts to create a binary predicate based on a binary expression,
539    /// if applicable.
540    ///
541    /// This method evaluates a given binary expression and, if the operation
542    /// is the given operator and the literal can be transformed, constructs a
543    /// `Predicate::Binary`variant representing the binary operation.
544    fn project_binary_expr(
545        &self,
546        name: &str,
547        op: PredicateOperator,
548        expr: &BinaryExpression<BoundReference>,
549        func: &BoxedTransformFunction,
550    ) -> Result<Option<Predicate>> {
551        if expr.op() != op || !self.can_transform(expr.literal()) {
552            return Ok(None);
553        }
554
555        Ok(Some(Predicate::Binary(BinaryExpression::new(
556            expr.op(),
557            Reference::new(name),
558            func.transform_literal_result(expr.literal())?,
559        ))))
560    }
561
562    /// Projects a binary expression to a predicate with an adjusted boundary.
563    ///
564    /// Checks if the literal within the given binary expression is
565    /// transformable. If transformable, it proceeds to potentially adjust
566    /// the boundary of the expression based on the comparison operator (`op`).
567    /// The potential adjustments involve incrementing or decrementing the
568    /// literal value and changing the `PredicateOperator` itself to its
569    /// inclusive variant.
570    fn project_binary_with_adjusted_boundary(
571        &self,
572        name: &str,
573        expr: &BinaryExpression<BoundReference>,
574        func: &BoxedTransformFunction,
575        width: Option<u32>,
576    ) -> Result<Option<Predicate>> {
577        if !self.can_transform(expr.literal()) {
578            return Ok(None);
579        }
580
581        let op = &expr.op();
582        let datum = &expr.literal();
583
584        if let Some(boundary) = Self::adjust_boundary(op, datum)? {
585            let transformed_projection = func.transform_literal_result(&boundary)?;
586
587            let adjusted_projection =
588                self.adjust_time_projection(op, datum, &transformed_projection);
589
590            let adjusted_operator = Self::adjust_operator(op, datum, width);
591
592            if let Some(op) = adjusted_operator {
593                let predicate = match adjusted_projection {
594                    None => Predicate::Binary(BinaryExpression::new(
595                        op,
596                        Reference::new(name),
597                        transformed_projection,
598                    )),
599                    Some(AdjustedProjection::Single(d)) => {
600                        Predicate::Binary(BinaryExpression::new(op, Reference::new(name), d))
601                    }
602                    Some(AdjustedProjection::Set(d)) => Predicate::Set(SetExpression::new(
603                        PredicateOperator::In,
604                        Reference::new(name),
605                        d,
606                    )),
607                };
608                return Ok(Some(predicate));
609            }
610        };
611
612        Ok(None)
613    }
614
615    /// Projects a set expression to a predicate,
616    /// applying a transformation to each literal in the set.
617    fn project_set_expr(
618        &self,
619        expr: &SetExpression<BoundReference>,
620        op: PredicateOperator,
621        name: &str,
622        func: &BoxedTransformFunction,
623    ) -> Result<Option<Predicate>> {
624        if expr.op() != op || expr.literals().iter().any(|d| !self.can_transform(d)) {
625            return Ok(None);
626        }
627
628        let mut new_set = FnvHashSet::default();
629
630        for lit in expr.literals() {
631            let datum = func.transform_literal_result(lit)?;
632
633            if let Some(AdjustedProjection::Single(d)) =
634                self.adjust_time_projection(&op, lit, &datum)
635            {
636                new_set.insert(d);
637            };
638
639            new_set.insert(datum);
640        }
641
642        Ok(Some(Predicate::Set(SetExpression::new(
643            expr.op(),
644            Reference::new(name),
645            new_set,
646        ))))
647    }
648
649    /// Adjusts the boundary value for comparison operations
650    /// based on the specified `PredicateOperator` and `Datum`.
651    ///
652    /// This function modifies the boundary value for certain comparison
653    /// operators (`LessThan`, `GreaterThan`) by incrementing or decrementing
654    /// the literal value within the given `Datum`. For operators that do not
655    /// imply a boundary shift (`Eq`, `LessThanOrEq`, `GreaterThanOrEq`,
656    /// `StartsWith`, `NotStartsWith`), the original datum is returned
657    /// unmodified.
658    fn adjust_boundary(op: &PredicateOperator, datum: &Datum) -> Result<Option<Datum>> {
659        let adjusted_boundary = match op {
660            PredicateOperator::LessThan => match (datum.data_type(), datum.literal()) {
661                (PrimitiveType::Int, PrimitiveLiteral::Int(v)) => Some(Datum::int(v - 1)),
662                (PrimitiveType::Long, PrimitiveLiteral::Long(v)) => Some(Datum::long(v - 1)),
663                (PrimitiveType::Decimal { .. }, PrimitiveLiteral::Int128(v)) => {
664                    Some(Datum::decimal(decimal_from_i128_with_scale(v - 1, 0))?)
665                }
666                (PrimitiveType::Date, PrimitiveLiteral::Int(v)) => Some(Datum::date(v - 1)),
667                (PrimitiveType::Timestamp, PrimitiveLiteral::Long(v)) => {
668                    Some(Datum::timestamp_micros(v - 1))
669                }
670                (PrimitiveType::Timestamptz, PrimitiveLiteral::Long(v)) => {
671                    Some(Datum::timestamptz_micros(v - 1))
672                }
673                (PrimitiveType::TimestampNs, PrimitiveLiteral::Long(v)) => {
674                    Some(Datum::timestamp_nanos(v - 1))
675                }
676                (PrimitiveType::TimestamptzNs, PrimitiveLiteral::Long(v)) => {
677                    Some(Datum::timestamptz_nanos(v - 1))
678                }
679                _ => Some(datum.to_owned()),
680            },
681            PredicateOperator::GreaterThan => match (datum.data_type(), datum.literal()) {
682                (PrimitiveType::Int, PrimitiveLiteral::Int(v)) => Some(Datum::int(v + 1)),
683                (PrimitiveType::Long, PrimitiveLiteral::Long(v)) => Some(Datum::long(v + 1)),
684                (PrimitiveType::Decimal { .. }, PrimitiveLiteral::Int128(v)) => {
685                    Some(Datum::decimal(decimal_from_i128_with_scale(v + 1, 0))?)
686                }
687                (PrimitiveType::Date, PrimitiveLiteral::Int(v)) => Some(Datum::date(v + 1)),
688                (PrimitiveType::Timestamp, PrimitiveLiteral::Long(v)) => {
689                    Some(Datum::timestamp_micros(v + 1))
690                }
691                (PrimitiveType::Timestamptz, PrimitiveLiteral::Long(v)) => {
692                    Some(Datum::timestamptz_micros(v + 1))
693                }
694                (PrimitiveType::TimestampNs, PrimitiveLiteral::Long(v)) => {
695                    Some(Datum::timestamp_nanos(v + 1))
696                }
697                (PrimitiveType::TimestamptzNs, PrimitiveLiteral::Long(v)) => {
698                    Some(Datum::timestamptz_nanos(v + 1))
699                }
700                _ => Some(datum.to_owned()),
701            },
702            PredicateOperator::Eq
703            | PredicateOperator::LessThanOrEq
704            | PredicateOperator::GreaterThanOrEq
705            | PredicateOperator::StartsWith
706            | PredicateOperator::NotStartsWith => Some(datum.to_owned()),
707            _ => None,
708        };
709
710        Ok(adjusted_boundary)
711    }
712
713    /// Adjusts the comparison operator based on the specified datum and an
714    /// optional width constraint.
715    ///
716    /// This function modifies the comparison operator for `LessThan` and
717    /// `GreaterThan` cases to their inclusive counterparts (`LessThanOrEq`,
718    /// `GreaterThanOrEq`) unconditionally. For `StartsWith` and
719    /// `NotStartsWith` operators acting on string literals, the operator may
720    /// be adjusted to `Eq` or `NotEq` if the string length matches the
721    /// specified width, indicating a precise match rather than a prefix
722    /// condition.
723    fn adjust_operator(
724        op: &PredicateOperator,
725        datum: &Datum,
726        width: Option<u32>,
727    ) -> Option<PredicateOperator> {
728        match op {
729            PredicateOperator::LessThan => Some(PredicateOperator::LessThanOrEq),
730            PredicateOperator::GreaterThan => Some(PredicateOperator::GreaterThanOrEq),
731            PredicateOperator::StartsWith => match datum.literal() {
732                PrimitiveLiteral::String(s) => {
733                    if let Some(w) = width
734                        && s.len() == w as usize
735                    {
736                        return Some(PredicateOperator::Eq);
737                    };
738                    Some(*op)
739                }
740                _ => Some(*op),
741            },
742            PredicateOperator::NotStartsWith => match datum.literal() {
743                PrimitiveLiteral::String(s) => {
744                    if let Some(w) = width {
745                        let w = w as usize;
746
747                        if s.len() == w {
748                            return Some(PredicateOperator::NotEq);
749                        }
750
751                        if s.len() < w {
752                            return Some(*op);
753                        }
754
755                        return None;
756                    };
757                    Some(*op)
758                }
759                _ => Some(*op),
760            },
761            _ => Some(*op),
762        }
763    }
764
765    /// Adjust projection for temporal transforms, align with Java
766    /// implementation: https://github.com/apache/iceberg/blob/main/api/src/main/java/org/apache/iceberg/transforms/ProjectionUtil.java#L275
767    fn adjust_time_projection(
768        &self,
769        op: &PredicateOperator,
770        original: &Datum,
771        transformed: &Datum,
772    ) -> Option<AdjustedProjection> {
773        let should_adjust = match self {
774            Transform::Day => matches!(original.data_type(), PrimitiveType::Timestamp),
775            Transform::Year | Transform::Month => true,
776            _ => false,
777        };
778
779        if should_adjust && let &PrimitiveLiteral::Int(v) = transformed.literal() {
780            match op {
781                PredicateOperator::LessThan
782                | PredicateOperator::LessThanOrEq
783                | PredicateOperator::In => {
784                    if v < 0 {
785                        // # TODO
786                        // An ugly hack to fix. Refine the increment and decrement logic later.
787                        match self {
788                            Transform::Day => {
789                                return Some(AdjustedProjection::Single(Datum::date(v + 1)));
790                            }
791                            _ => {
792                                return Some(AdjustedProjection::Single(Datum::int(v + 1)));
793                            }
794                        }
795                    };
796                }
797                PredicateOperator::Eq => {
798                    if v < 0 {
799                        let new_set = FnvHashSet::from_iter(vec![
800                            transformed.to_owned(),
801                            // # TODO
802                            // An ugly hack to fix. Refine the increment and decrement logic later.
803                            {
804                                match self {
805                                    Transform::Day => Datum::date(v + 1),
806                                    _ => Datum::int(v + 1),
807                                }
808                            },
809                        ]);
810                        return Some(AdjustedProjection::Set(new_set));
811                    }
812                }
813                _ => {
814                    return None;
815                }
816            }
817        };
818        None
819    }
820
821    // Increment for Int, Long, Decimal, Date, Timestamp
822    // Ignore other types
823    #[inline]
824    fn try_increment_number(datum: &Datum) -> Result<Datum> {
825        match (datum.data_type(), datum.literal()) {
826            (PrimitiveType::Int, PrimitiveLiteral::Int(v)) => Ok(Datum::int(v + 1)),
827            (PrimitiveType::Long, PrimitiveLiteral::Long(v)) => Ok(Datum::long(v + 1)),
828            (PrimitiveType::Decimal { .. }, PrimitiveLiteral::Int128(v)) => {
829                Datum::decimal(decimal_from_i128_with_scale(v + 1, 0))
830            }
831            (PrimitiveType::Date, PrimitiveLiteral::Int(v)) => Ok(Datum::date(v + 1)),
832            (PrimitiveType::Timestamp, PrimitiveLiteral::Long(v)) => {
833                Ok(Datum::timestamp_micros(v + 1))
834            }
835            (PrimitiveType::TimestampNs, PrimitiveLiteral::Long(v)) => {
836                Ok(Datum::timestamp_nanos(v + 1))
837            }
838            (PrimitiveType::Timestamptz, PrimitiveLiteral::Long(v)) => {
839                Ok(Datum::timestamptz_micros(v + 1))
840            }
841            (PrimitiveType::TimestamptzNs, PrimitiveLiteral::Long(v)) => {
842                Ok(Datum::timestamptz_nanos(v + 1))
843            }
844            (PrimitiveType::Int, _)
845            | (PrimitiveType::Long, _)
846            | (PrimitiveType::Decimal { .. }, _)
847            | (PrimitiveType::Date, _)
848            | (PrimitiveType::Timestamp, _) => Err(Error::new(
849                ErrorKind::Unexpected,
850                format!(
851                    "Unsupported literal increment for type: {:?}",
852                    datum.data_type()
853                ),
854            )),
855            _ => Ok(datum.to_owned()),
856        }
857    }
858
859    // Decrement for Int, Long, Decimal, Date, Timestamp
860    // Ignore other types
861    #[inline]
862    fn try_decrement_number(datum: &Datum) -> Result<Datum> {
863        match (datum.data_type(), datum.literal()) {
864            (PrimitiveType::Int, PrimitiveLiteral::Int(v)) => Ok(Datum::int(v - 1)),
865            (PrimitiveType::Long, PrimitiveLiteral::Long(v)) => Ok(Datum::long(v - 1)),
866            (PrimitiveType::Decimal { .. }, PrimitiveLiteral::Int128(v)) => {
867                Datum::decimal(decimal_from_i128_with_scale(v - 1, 0))
868            }
869            (PrimitiveType::Date, PrimitiveLiteral::Int(v)) => Ok(Datum::date(v - 1)),
870            (PrimitiveType::Timestamp, PrimitiveLiteral::Long(v)) => {
871                Ok(Datum::timestamp_micros(v - 1))
872            }
873            (PrimitiveType::TimestampNs, PrimitiveLiteral::Long(v)) => {
874                Ok(Datum::timestamp_nanos(v - 1))
875            }
876            (PrimitiveType::Timestamptz, PrimitiveLiteral::Long(v)) => {
877                Ok(Datum::timestamptz_micros(v - 1))
878            }
879            (PrimitiveType::TimestamptzNs, PrimitiveLiteral::Long(v)) => {
880                Ok(Datum::timestamptz_nanos(v - 1))
881            }
882            (PrimitiveType::Int, _)
883            | (PrimitiveType::Long, _)
884            | (PrimitiveType::Decimal { .. }, _)
885            | (PrimitiveType::Date, _)
886            | (PrimitiveType::Timestamp, _) => Err(Error::new(
887                ErrorKind::Unexpected,
888                format!(
889                    "Unsupported literal decrement for type: {:?}",
890                    datum.data_type()
891                ),
892            )),
893            _ => Ok(datum.to_owned()),
894        }
895    }
896
897    fn truncate_number_strict(
898        &self,
899        name: &str,
900        expr: &BinaryExpression<BoundReference>,
901        func: &BoxedTransformFunction,
902    ) -> Result<Option<Predicate>> {
903        let boundary = expr.literal();
904
905        if !matches!(
906            boundary.data_type(),
907            &PrimitiveType::Int
908                | &PrimitiveType::Long
909                | &PrimitiveType::Decimal { .. }
910                | &PrimitiveType::Date
911                | &PrimitiveType::Timestamp
912                | &PrimitiveType::Timestamptz
913                | &PrimitiveType::TimestampNs
914                | &PrimitiveType::TimestamptzNs
915        ) {
916            return Err(Error::new(
917                ErrorKind::DataInvalid,
918                format!("Expected a numeric literal, got: {boundary:?}"),
919            ));
920        }
921
922        let predicate = match expr.op() {
923            PredicateOperator::LessThan => Some(Predicate::Binary(BinaryExpression::new(
924                PredicateOperator::LessThan,
925                Reference::new(name),
926                func.transform_literal_result(boundary)?,
927            ))),
928            PredicateOperator::LessThanOrEq => Some(Predicate::Binary(BinaryExpression::new(
929                PredicateOperator::LessThan,
930                Reference::new(name),
931                func.transform_literal_result(&Self::try_increment_number(boundary)?)?,
932            ))),
933            PredicateOperator::GreaterThan => Some(Predicate::Binary(BinaryExpression::new(
934                PredicateOperator::GreaterThan,
935                Reference::new(name),
936                func.transform_literal_result(boundary)?,
937            ))),
938            PredicateOperator::GreaterThanOrEq => Some(Predicate::Binary(BinaryExpression::new(
939                PredicateOperator::GreaterThan,
940                Reference::new(name),
941                func.transform_literal_result(&Self::try_decrement_number(boundary)?)?,
942            ))),
943            PredicateOperator::NotEq => Some(Predicate::Binary(BinaryExpression::new(
944                PredicateOperator::NotEq,
945                Reference::new(name),
946                func.transform_literal_result(boundary)?,
947            ))),
948            _ => None,
949        };
950
951        Ok(predicate)
952    }
953
954    fn truncate_array_strict(
955        &self,
956        name: &str,
957        expr: &BinaryExpression<BoundReference>,
958        func: &BoxedTransformFunction,
959    ) -> Result<Option<Predicate>> {
960        let boundary = expr.literal();
961
962        match expr.op() {
963            PredicateOperator::LessThan | PredicateOperator::LessThanOrEq => {
964                Ok(Some(Predicate::Binary(BinaryExpression::new(
965                    PredicateOperator::LessThan,
966                    Reference::new(name),
967                    func.transform_literal_result(boundary)?,
968                ))))
969            }
970            PredicateOperator::GreaterThan | PredicateOperator::GreaterThanOrEq => {
971                Ok(Some(Predicate::Binary(BinaryExpression::new(
972                    PredicateOperator::GreaterThan,
973                    Reference::new(name),
974                    func.transform_literal_result(boundary)?,
975                ))))
976            }
977            PredicateOperator::NotEq => Ok(Some(Predicate::Binary(BinaryExpression::new(
978                PredicateOperator::NotEq,
979                Reference::new(name),
980                func.transform_literal_result(boundary)?,
981            )))),
982            _ => Ok(None),
983        }
984    }
985}
986
987impl Display for Transform {
988    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
989        match self {
990            Transform::Identity => write!(f, "identity"),
991            Transform::Year => write!(f, "year"),
992            Transform::Month => write!(f, "month"),
993            Transform::Day => write!(f, "day"),
994            Transform::Hour => write!(f, "hour"),
995            Transform::Void => write!(f, "void"),
996            Transform::Bucket(length) => write!(f, "bucket[{length}]"),
997            Transform::Truncate(width) => write!(f, "truncate[{width}]"),
998            Transform::Unknown => write!(f, "unknown"),
999        }
1000    }
1001}
1002
1003impl FromStr for Transform {
1004    type Err = Error;
1005
1006    fn from_str(s: &str) -> Result<Self> {
1007        let t = match s {
1008            "identity" => Transform::Identity,
1009            "year" => Transform::Year,
1010            "month" => Transform::Month,
1011            "day" => Transform::Day,
1012            "hour" => Transform::Hour,
1013            "void" => Transform::Void,
1014            "unknown" => Transform::Unknown,
1015            v if v.starts_with("bucket") => {
1016                let length = v
1017                    .strip_prefix("bucket")
1018                    .expect("transform must starts with `bucket`")
1019                    .trim_start_matches('[')
1020                    .trim_end_matches(']')
1021                    .parse()
1022                    .map_err(|err| {
1023                        Error::new(
1024                            ErrorKind::DataInvalid,
1025                            format!("transform bucket type {v:?} is invalid"),
1026                        )
1027                        .with_source(err)
1028                    })?;
1029
1030                Transform::Bucket(length)
1031            }
1032            v if v.starts_with("truncate") => {
1033                let width = v
1034                    .strip_prefix("truncate")
1035                    .expect("transform must starts with `truncate`")
1036                    .trim_start_matches('[')
1037                    .trim_end_matches(']')
1038                    .parse()
1039                    .map_err(|err| {
1040                        Error::new(
1041                            ErrorKind::DataInvalid,
1042                            format!("transform truncate type {v:?} is invalid"),
1043                        )
1044                        .with_source(err)
1045                    })?;
1046
1047                Transform::Truncate(width)
1048            }
1049            v => {
1050                return Err(Error::new(
1051                    ErrorKind::DataInvalid,
1052                    format!("transform {v:?} is invalid"),
1053                ));
1054            }
1055        };
1056
1057        Ok(t)
1058    }
1059}
1060
1061impl Serialize for Transform {
1062    fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
1063    where S: Serializer {
1064        serializer.serialize_str(format!("{self}").as_str())
1065    }
1066}
1067
1068impl<'de> Deserialize<'de> for Transform {
1069    fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
1070    where D: Deserializer<'de> {
1071        let s = String::deserialize(deserializer)?;
1072        s.parse().map_err(<D::Error as serde::de::Error>::custom)
1073    }
1074}
1075
1076/// An enum representing the result of the adjusted projection.
1077/// Either being a single adjusted datum or a set.
1078#[derive(Debug)]
1079enum AdjustedProjection {
1080    Single(Datum),
1081    Set(FnvHashSet<Datum>),
1082}
1083
1084#[cfg(test)]
1085mod tests {
1086    use super::*;
1087
1088    fn check_boundary(op: PredicateOperator, input: Datum, expected: Datum) {
1089        let result = Transform::adjust_boundary(&op, &input).unwrap().unwrap();
1090        assert_eq!(result, expected);
1091    }
1092
1093    #[test]
1094    fn test_adjust_boundary_timestamp_types() {
1095        for (datum, dec, inc) in [
1096            (
1097                Datum::timestamptz_micros(1000),
1098                Datum::timestamptz_micros(999),
1099                Datum::timestamptz_micros(1001),
1100            ),
1101            (
1102                Datum::timestamp_nanos(5000),
1103                Datum::timestamp_nanos(4999),
1104                Datum::timestamp_nanos(5001),
1105            ),
1106            (
1107                Datum::timestamptz_nanos(5000),
1108                Datum::timestamptz_nanos(4999),
1109                Datum::timestamptz_nanos(5001),
1110            ),
1111        ] {
1112            check_boundary(PredicateOperator::LessThan, datum.clone(), dec);
1113            check_boundary(PredicateOperator::GreaterThan, datum.clone(), inc);
1114            check_boundary(
1115                PredicateOperator::LessThanOrEq,
1116                datum.clone(),
1117                datum.clone(),
1118            );
1119            check_boundary(PredicateOperator::GreaterThanOrEq, datum.clone(), datum);
1120        }
1121    }
1122}