iceberg/spec/
transform.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Transforms in iceberg.
19
20use std::cmp::Ordering;
21use std::fmt::{Display, Formatter};
22use std::str::FromStr;
23
24use fnv::FnvHashSet;
25use serde::{Deserialize, Deserializer, Serialize, Serializer};
26
27use super::values::decimal_utils::decimal_from_i128_with_scale;
28use super::{Datum, PrimitiveLiteral};
29use crate::ErrorKind;
30use crate::error::{Error, Result};
31use crate::expr::{
32    BinaryExpression, BoundPredicate, BoundReference, Predicate, PredicateOperator, Reference,
33    SetExpression, UnaryExpression,
34};
35use crate::spec::Literal;
36use crate::spec::datatypes::{PrimitiveType, Type};
37use crate::transform::{BoxedTransformFunction, create_transform_function};
38
39/// Transform is used to transform predicates to partition predicates,
40/// in addition to transforming data values.
41///
42/// Deriving partition predicates from column predicates on the table data
43/// is used to separate the logical queries from physical storage: the
44/// partitioning can change and the correct partition filters are always
45/// derived from column predicates.
46///
47/// This simplifies queries because users don’t have to supply both logical
48/// predicates and partition predicates.
49///
50/// All transforms must return `null` for a `null` input value.
51#[derive(Debug, PartialEq, Eq, Clone, Copy, Hash)]
52pub enum Transform {
53    /// Source value, unmodified
54    ///
55    /// - Source type could be any type.
56    /// - Return type is the same with source type.
57    Identity,
58    /// Hash of value, mod `N`.
59    ///
60    /// Bucket partition transforms use a 32-bit hash of the source value.
61    /// The 32-bit hash implementation is the 32-bit Murmur3 hash, x86
62    /// variant, seeded with 0.
63    ///
64    /// Transforms are parameterized by a number of buckets, N. The hash mod
65    /// N must produce a positive value by first discarding the sign bit of
66    /// the hash value. In pseudo-code, the function is:
67    ///
68    /// ```text
69    /// def bucket_N(x) = (murmur3_x86_32_hash(x) & Integer.MAX_VALUE) % N
70    /// ```
71    ///
72    /// - Source type could be `int`, `long`, `decimal`, `date`, `time`,
73    ///   `timestamp`, `timestamptz`, `string`, `uuid`, `fixed`, `binary`.
74    /// - Return type is `int`.
75    Bucket(u32),
76    /// Value truncated to width `W`
77    ///
78    /// For `int`:
79    ///
80    /// - `v - (v % W)` remainders must be positive
81    /// - example: W=10: 1 → 0, -1 → -10
82    /// - note: The remainder, v % W, must be positive.
83    ///
84    /// For `long`:
85    ///
86    /// - `v - (v % W)` remainders must be positive
87    /// - example: W=10: 1 → 0, -1 → -10
88    /// - note: The remainder, v % W, must be positive.
89    ///
90    /// For `decimal`:
91    ///
92    /// - `scaled_W = decimal(W, scale(v)) v - (v % scaled_W)`
93    /// - example: W=50, s=2: 10.65 → 10.50
94    ///
95    /// For `string`:
96    ///
97    /// - Substring of length L: `v.substring(0, L)`
98    /// - example: L=3: iceberg → ice
99    /// - note: Strings are truncated to a valid UTF-8 string with no more
100    ///   than L code points.
101    ///
102    /// - Source type could be `int`, `long`, `decimal`, `string`
103    /// - Return type is the same with source type.
104    Truncate(u32),
105    /// Extract a date or timestamp year, as years from 1970
106    ///
107    /// - Source type could be `date`, `timestamp`, `timestamptz`
108    /// - Return type is `int`
109    Year,
110    /// Extract a date or timestamp month, as months from 1970-01-01
111    ///
112    /// - Source type could be `date`, `timestamp`, `timestamptz`
113    /// - Return type is `int`
114    Month,
115    /// Extract a date or timestamp day, as days from 1970-01-01
116    ///
117    /// - Source type could be `date`, `timestamp`, `timestamptz`
118    /// - Return type is `int`
119    Day,
120    /// Extract a timestamp hour, as hours from 1970-01-01 00:00:00
121    ///
122    /// - Source type could be `timestamp`, `timestamptz`
123    /// - Return type is `int`
124    Hour,
125    /// Always produces `null`
126    ///
127    /// The void transform may be used to replace the transform in an
128    /// existing partition field so that the field is effectively dropped in
129    /// v1 tables.
130    ///
131    /// - Source type could be any type..
132    /// - Return type is Source type.
133    Void,
134    /// Used to represent some customized transform that can't be recognized or supported now.
135    Unknown,
136}
137
138impl Transform {
139    /// Returns a human-readable String representation of a transformed value.
140    pub fn to_human_string(&self, field_type: &Type, value: Option<&Literal>) -> String {
141        let Some(value) = value else {
142            return "null".to_string();
143        };
144
145        if let Some(value) = value.as_primitive_literal() {
146            let field_type = field_type.as_primitive_type().unwrap();
147            let datum = Datum::new(field_type.clone(), value);
148
149            match self {
150                Self::Void => "null".to_string(),
151                _ => datum.to_human_string(),
152            }
153        } else {
154            "null".to_string()
155        }
156    }
157
158    /// Get the return type of transform given the input type.
159    /// Returns `None` if it can't be transformed.
160    pub fn result_type(&self, input_type: &Type) -> Result<Type> {
161        match self {
162            Transform::Identity => {
163                if matches!(input_type, Type::Primitive(_)) {
164                    Ok(input_type.clone())
165                } else {
166                    Err(Error::new(
167                        ErrorKind::DataInvalid,
168                        format!("{input_type} is not a valid input type of identity transform",),
169                    ))
170                }
171            }
172            Transform::Void => Ok(input_type.clone()),
173            Transform::Unknown => Ok(Type::Primitive(PrimitiveType::String)),
174            Transform::Bucket(_) => {
175                if let Type::Primitive(p) = input_type {
176                    match p {
177                        PrimitiveType::Int
178                        | PrimitiveType::Long
179                        | PrimitiveType::Decimal { .. }
180                        | PrimitiveType::Date
181                        | PrimitiveType::Time
182                        | PrimitiveType::Timestamp
183                        | PrimitiveType::Timestamptz
184                        | PrimitiveType::TimestampNs
185                        | PrimitiveType::TimestamptzNs
186                        | PrimitiveType::String
187                        | PrimitiveType::Uuid
188                        | PrimitiveType::Fixed(_)
189                        | PrimitiveType::Binary => Ok(Type::Primitive(PrimitiveType::Int)),
190                        _ => Err(Error::new(
191                            ErrorKind::DataInvalid,
192                            format!("{input_type} is not a valid input type of bucket transform",),
193                        )),
194                    }
195                } else {
196                    Err(Error::new(
197                        ErrorKind::DataInvalid,
198                        format!("{input_type} is not a valid input type of bucket transform",),
199                    ))
200                }
201            }
202            Transform::Truncate(_) => {
203                if let Type::Primitive(p) = input_type {
204                    match p {
205                        PrimitiveType::Int
206                        | PrimitiveType::Long
207                        | PrimitiveType::String
208                        | PrimitiveType::Binary
209                        | PrimitiveType::Decimal { .. } => Ok(input_type.clone()),
210                        _ => Err(Error::new(
211                            ErrorKind::DataInvalid,
212                            format!("{input_type} is not a valid input type of truncate transform",),
213                        )),
214                    }
215                } else {
216                    Err(Error::new(
217                        ErrorKind::DataInvalid,
218                        format!("{input_type} is not a valid input type of truncate transform",),
219                    ))
220                }
221            }
222            Transform::Year | Transform::Month => {
223                if let Type::Primitive(p) = input_type {
224                    match p {
225                        PrimitiveType::Timestamp
226                        | PrimitiveType::Timestamptz
227                        | PrimitiveType::TimestampNs
228                        | PrimitiveType::TimestamptzNs
229                        | PrimitiveType::Date => Ok(Type::Primitive(PrimitiveType::Int)),
230                        _ => Err(Error::new(
231                            ErrorKind::DataInvalid,
232                            format!("{input_type} is not a valid input type of {self} transform",),
233                        )),
234                    }
235                } else {
236                    Err(Error::new(
237                        ErrorKind::DataInvalid,
238                        format!("{input_type} is not a valid input type of {self} transform",),
239                    ))
240                }
241            }
242            Transform::Day => {
243                if let Type::Primitive(p) = input_type {
244                    match p {
245                        PrimitiveType::Timestamp
246                        | PrimitiveType::Timestamptz
247                        | PrimitiveType::TimestampNs
248                        | PrimitiveType::TimestamptzNs
249                        | PrimitiveType::Date => Ok(Type::Primitive(PrimitiveType::Date)),
250                        _ => Err(Error::new(
251                            ErrorKind::DataInvalid,
252                            format!("{input_type} is not a valid input type of {self} transform",),
253                        )),
254                    }
255                } else {
256                    Err(Error::new(
257                        ErrorKind::DataInvalid,
258                        format!("{input_type} is not a valid input type of {self} transform",),
259                    ))
260                }
261            }
262            Transform::Hour => {
263                if let Type::Primitive(p) = input_type {
264                    match p {
265                        PrimitiveType::Timestamp
266                        | PrimitiveType::Timestamptz
267                        | PrimitiveType::TimestampNs
268                        | PrimitiveType::TimestamptzNs => Ok(Type::Primitive(PrimitiveType::Int)),
269                        _ => Err(Error::new(
270                            ErrorKind::DataInvalid,
271                            format!("{input_type} is not a valid input type of {self} transform",),
272                        )),
273                    }
274                } else {
275                    Err(Error::new(
276                        ErrorKind::DataInvalid,
277                        format!("{input_type} is not a valid input type of {self} transform",),
278                    ))
279                }
280            }
281        }
282    }
283
284    /// Whether the transform preserves the order of values.
285    pub fn preserves_order(&self) -> bool {
286        !matches!(
287            self,
288            Transform::Void | Transform::Bucket(_) | Transform::Unknown
289        )
290    }
291
292    /// Return the unique transform name to check if similar transforms for the same source field
293    /// are added multiple times in partition spec builder.
294    pub fn dedup_name(&self) -> String {
295        match self {
296            Transform::Year | Transform::Month | Transform::Day | Transform::Hour => {
297                "time".to_string()
298            }
299            _ => format!("{self}"),
300        }
301    }
302
303    /// Whether ordering by this transform's result satisfies the ordering of another transform's
304    /// result.
305    ///
306    /// For example, sorting by day(ts) will produce an ordering that is also by month(ts) or
307    ///  year(ts). However, sorting by day(ts) will not satisfy the order of hour(ts) or identity(ts).
308    pub fn satisfies_order_of(&self, other: &Self) -> bool {
309        match self {
310            Transform::Identity => other.preserves_order(),
311            Transform::Hour => matches!(
312                other,
313                Transform::Hour | Transform::Day | Transform::Month | Transform::Year
314            ),
315            Transform::Day => matches!(other, Transform::Day | Transform::Month | Transform::Year),
316            Transform::Month => matches!(other, Transform::Month | Transform::Year),
317            _ => self == other,
318        }
319    }
320
321    /// Strictly projects a given predicate according to the transformation
322    /// specified by the `Transform` instance.
323    ///
324    /// This method ensures that the projected predicate is strictly aligned
325    /// with the transformation logic, providing a more precise filtering
326    /// mechanism for transformed data.
327    ///
328    /// # Example
329    /// Suppose, we have row filter `a = 10`, and a partition spec
330    /// `bucket(a, 37) as bs`, if one row matches `a = 10`, then its partition
331    /// value should match `bucket(10, 37) as bs`, and we project `a = 10` to
332    /// `bs = bucket(10, 37)`
333    pub fn strict_project(
334        &self,
335        name: &str,
336        predicate: &BoundPredicate,
337    ) -> Result<Option<Predicate>> {
338        let func = create_transform_function(self)?;
339
340        match self {
341            Transform::Identity => match predicate {
342                BoundPredicate::Unary(expr) => Self::project_unary(expr.op(), name),
343                BoundPredicate::Binary(expr) => Ok(Some(Predicate::Binary(BinaryExpression::new(
344                    expr.op(),
345                    Reference::new(name),
346                    expr.literal().to_owned(),
347                )))),
348                BoundPredicate::Set(expr) => Ok(Some(Predicate::Set(SetExpression::new(
349                    expr.op(),
350                    Reference::new(name),
351                    expr.literals().to_owned(),
352                )))),
353                _ => Ok(None),
354            },
355            Transform::Bucket(_) => match predicate {
356                BoundPredicate::Unary(expr) => Self::project_unary(expr.op(), name),
357                BoundPredicate::Binary(expr) => {
358                    self.project_binary_expr(name, PredicateOperator::NotEq, expr, &func)
359                }
360                BoundPredicate::Set(expr) => {
361                    self.project_set_expr(expr, PredicateOperator::NotIn, name, &func)
362                }
363                _ => Ok(None),
364            },
365            Transform::Truncate(width) => match predicate {
366                BoundPredicate::Unary(expr) => Self::project_unary(expr.op(), name),
367                BoundPredicate::Binary(expr) => {
368                    if matches!(
369                        expr.term().field().field_type.as_primitive_type(),
370                        Some(&PrimitiveType::Int)
371                            | Some(&PrimitiveType::Long)
372                            | Some(&PrimitiveType::Decimal { .. })
373                    ) {
374                        self.truncate_number_strict(name, expr, &func)
375                    } else if expr.op() == PredicateOperator::StartsWith {
376                        let len = match expr.literal().literal() {
377                            PrimitiveLiteral::String(s) => s.len(),
378                            PrimitiveLiteral::Binary(b) => b.len(),
379                            _ => {
380                                return Err(Error::new(
381                                    ErrorKind::DataInvalid,
382                                    format!(
383                                        "Expected a string or binary literal, got: {:?}",
384                                        expr.literal()
385                                    ),
386                                ));
387                            }
388                        };
389                        match len.cmp(&(*width as usize)) {
390                            Ordering::Less => Ok(Some(Predicate::Binary(BinaryExpression::new(
391                                PredicateOperator::StartsWith,
392                                Reference::new(name),
393                                expr.literal().to_owned(),
394                            )))),
395                            Ordering::Equal => Ok(Some(Predicate::Binary(BinaryExpression::new(
396                                PredicateOperator::Eq,
397                                Reference::new(name),
398                                expr.literal().to_owned(),
399                            )))),
400                            Ordering::Greater => Ok(None),
401                        }
402                    } else if expr.op() == PredicateOperator::NotStartsWith {
403                        let len = match expr.literal().literal() {
404                            PrimitiveLiteral::String(s) => s.len(),
405                            PrimitiveLiteral::Binary(b) => b.len(),
406                            _ => {
407                                return Err(Error::new(
408                                    ErrorKind::DataInvalid,
409                                    format!(
410                                        "Expected a string or binary literal, got: {:?}",
411                                        expr.literal()
412                                    ),
413                                ));
414                            }
415                        };
416                        match len.cmp(&(*width as usize)) {
417                            Ordering::Less => Ok(Some(Predicate::Binary(BinaryExpression::new(
418                                PredicateOperator::NotStartsWith,
419                                Reference::new(name),
420                                expr.literal().to_owned(),
421                            )))),
422                            Ordering::Equal => Ok(Some(Predicate::Binary(BinaryExpression::new(
423                                PredicateOperator::NotEq,
424                                Reference::new(name),
425                                expr.literal().to_owned(),
426                            )))),
427                            Ordering::Greater => {
428                                Ok(Some(Predicate::Binary(BinaryExpression::new(
429                                    expr.op(),
430                                    Reference::new(name),
431                                    func.transform_literal_result(expr.literal())?,
432                                ))))
433                            }
434                        }
435                    } else {
436                        self.truncate_array_strict(name, expr, &func)
437                    }
438                }
439                BoundPredicate::Set(expr) => {
440                    self.project_set_expr(expr, PredicateOperator::NotIn, name, &func)
441                }
442                _ => Ok(None),
443            },
444            Transform::Year | Transform::Month | Transform::Day | Transform::Hour => {
445                match predicate {
446                    BoundPredicate::Unary(expr) => Self::project_unary(expr.op(), name),
447                    BoundPredicate::Binary(expr) => self.truncate_number_strict(name, expr, &func),
448                    BoundPredicate::Set(expr) => {
449                        self.project_set_expr(expr, PredicateOperator::NotIn, name, &func)
450                    }
451                    _ => Ok(None),
452                }
453            }
454            _ => Ok(None),
455        }
456    }
457
458    /// Projects a given predicate according to the transformation
459    /// specified by the `Transform` instance.
460    ///
461    /// This allows predicates to be effectively applied to data
462    /// that has undergone transformation, enabling efficient querying
463    /// and filtering based on the original, untransformed data.
464    ///
465    /// # Example
466    /// Suppose, we have row filter `a = 10`, and a partition spec
467    /// `bucket(a, 37) as bs`, if one row matches `a = 10`, then its partition
468    /// value should match `bucket(10, 37) as bs`, and we project `a = 10` to
469    /// `bs = bucket(10, 37)`
470    pub fn project(&self, name: &str, predicate: &BoundPredicate) -> Result<Option<Predicate>> {
471        let func = create_transform_function(self)?;
472
473        match self {
474            Transform::Identity => match predicate {
475                BoundPredicate::Unary(expr) => Self::project_unary(expr.op(), name),
476                BoundPredicate::Binary(expr) => Ok(Some(Predicate::Binary(BinaryExpression::new(
477                    expr.op(),
478                    Reference::new(name),
479                    expr.literal().to_owned(),
480                )))),
481                BoundPredicate::Set(expr) => Ok(Some(Predicate::Set(SetExpression::new(
482                    expr.op(),
483                    Reference::new(name),
484                    expr.literals().to_owned(),
485                )))),
486                _ => Ok(None),
487            },
488            Transform::Bucket(_) => match predicate {
489                BoundPredicate::Unary(expr) => Self::project_unary(expr.op(), name),
490                BoundPredicate::Binary(expr) => {
491                    self.project_binary_expr(name, PredicateOperator::Eq, expr, &func)
492                }
493                BoundPredicate::Set(expr) => {
494                    self.project_set_expr(expr, PredicateOperator::In, name, &func)
495                }
496                _ => Ok(None),
497            },
498            Transform::Truncate(width) => match predicate {
499                BoundPredicate::Unary(expr) => Self::project_unary(expr.op(), name),
500                BoundPredicate::Binary(expr) => {
501                    self.project_binary_with_adjusted_boundary(name, expr, &func, Some(*width))
502                }
503                BoundPredicate::Set(expr) => {
504                    self.project_set_expr(expr, PredicateOperator::In, name, &func)
505                }
506                _ => Ok(None),
507            },
508            Transform::Year | Transform::Month | Transform::Day | Transform::Hour => {
509                match predicate {
510                    BoundPredicate::Unary(expr) => Self::project_unary(expr.op(), name),
511                    BoundPredicate::Binary(expr) => {
512                        self.project_binary_with_adjusted_boundary(name, expr, &func, None)
513                    }
514                    BoundPredicate::Set(expr) => {
515                        self.project_set_expr(expr, PredicateOperator::In, name, &func)
516                    }
517                    _ => Ok(None),
518                }
519            }
520            _ => Ok(None),
521        }
522    }
523
524    /// Check if `Transform` is applicable on datum's `PrimitiveType`
525    fn can_transform(&self, datum: &Datum) -> bool {
526        let input_type = datum.data_type().clone();
527        self.result_type(&Type::Primitive(input_type)).is_ok()
528    }
529
530    /// Creates a unary predicate from a given operator and a reference name.
531    fn project_unary(op: PredicateOperator, name: &str) -> Result<Option<Predicate>> {
532        Ok(Some(Predicate::Unary(UnaryExpression::new(
533            op,
534            Reference::new(name),
535        ))))
536    }
537
538    /// Attempts to create a binary predicate based on a binary expression,
539    /// if applicable.
540    ///
541    /// This method evaluates a given binary expression and, if the operation
542    /// is the given operator and the literal can be transformed, constructs a
543    /// `Predicate::Binary`variant representing the binary operation.
544    fn project_binary_expr(
545        &self,
546        name: &str,
547        op: PredicateOperator,
548        expr: &BinaryExpression<BoundReference>,
549        func: &BoxedTransformFunction,
550    ) -> Result<Option<Predicate>> {
551        if expr.op() != op || !self.can_transform(expr.literal()) {
552            return Ok(None);
553        }
554
555        Ok(Some(Predicate::Binary(BinaryExpression::new(
556            expr.op(),
557            Reference::new(name),
558            func.transform_literal_result(expr.literal())?,
559        ))))
560    }
561
562    /// Projects a binary expression to a predicate with an adjusted boundary.
563    ///
564    /// Checks if the literal within the given binary expression is
565    /// transformable. If transformable, it proceeds to potentially adjust
566    /// the boundary of the expression based on the comparison operator (`op`).
567    /// The potential adjustments involve incrementing or decrementing the
568    /// literal value and changing the `PredicateOperator` itself to its
569    /// inclusive variant.
570    fn project_binary_with_adjusted_boundary(
571        &self,
572        name: &str,
573        expr: &BinaryExpression<BoundReference>,
574        func: &BoxedTransformFunction,
575        width: Option<u32>,
576    ) -> Result<Option<Predicate>> {
577        if !self.can_transform(expr.literal()) {
578            return Ok(None);
579        }
580
581        let op = &expr.op();
582        let datum = &expr.literal();
583
584        if let Some(boundary) = Self::adjust_boundary(op, datum)? {
585            let transformed_projection = func.transform_literal_result(&boundary)?;
586
587            let adjusted_projection =
588                self.adjust_time_projection(op, datum, &transformed_projection);
589
590            let adjusted_operator = Self::adjust_operator(op, datum, width);
591
592            if let Some(op) = adjusted_operator {
593                let predicate = match adjusted_projection {
594                    None => Predicate::Binary(BinaryExpression::new(
595                        op,
596                        Reference::new(name),
597                        transformed_projection,
598                    )),
599                    Some(AdjustedProjection::Single(d)) => {
600                        Predicate::Binary(BinaryExpression::new(op, Reference::new(name), d))
601                    }
602                    Some(AdjustedProjection::Set(d)) => Predicate::Set(SetExpression::new(
603                        PredicateOperator::In,
604                        Reference::new(name),
605                        d,
606                    )),
607                };
608                return Ok(Some(predicate));
609            }
610        };
611
612        Ok(None)
613    }
614
615    /// Projects a set expression to a predicate,
616    /// applying a transformation to each literal in the set.
617    fn project_set_expr(
618        &self,
619        expr: &SetExpression<BoundReference>,
620        op: PredicateOperator,
621        name: &str,
622        func: &BoxedTransformFunction,
623    ) -> Result<Option<Predicate>> {
624        if expr.op() != op || expr.literals().iter().any(|d| !self.can_transform(d)) {
625            return Ok(None);
626        }
627
628        let mut new_set = FnvHashSet::default();
629
630        for lit in expr.literals() {
631            let datum = func.transform_literal_result(lit)?;
632
633            if let Some(AdjustedProjection::Single(d)) =
634                self.adjust_time_projection(&op, lit, &datum)
635            {
636                new_set.insert(d);
637            };
638
639            new_set.insert(datum);
640        }
641
642        Ok(Some(Predicate::Set(SetExpression::new(
643            expr.op(),
644            Reference::new(name),
645            new_set,
646        ))))
647    }
648
649    /// Adjusts the boundary value for comparison operations
650    /// based on the specified `PredicateOperator` and `Datum`.
651    ///
652    /// This function modifies the boundary value for certain comparison
653    /// operators (`LessThan`, `GreaterThan`) by incrementing or decrementing
654    /// the literal value within the given `Datum`. For operators that do not
655    /// imply a boundary shift (`Eq`, `LessThanOrEq`, `GreaterThanOrEq`,
656    /// `StartsWith`, `NotStartsWith`), the original datum is returned
657    /// unmodified.
658    fn adjust_boundary(op: &PredicateOperator, datum: &Datum) -> Result<Option<Datum>> {
659        let adjusted_boundary = match op {
660            PredicateOperator::LessThan => match (datum.data_type(), datum.literal()) {
661                (PrimitiveType::Int, PrimitiveLiteral::Int(v)) => Some(Datum::int(v - 1)),
662                (PrimitiveType::Long, PrimitiveLiteral::Long(v)) => Some(Datum::long(v - 1)),
663                (PrimitiveType::Decimal { .. }, PrimitiveLiteral::Int128(v)) => {
664                    Some(Datum::decimal(decimal_from_i128_with_scale(v - 1, 0))?)
665                }
666                (PrimitiveType::Date, PrimitiveLiteral::Int(v)) => Some(Datum::date(v - 1)),
667                (PrimitiveType::Timestamp, PrimitiveLiteral::Long(v)) => {
668                    Some(Datum::timestamp_micros(v - 1))
669                }
670                _ => Some(datum.to_owned()),
671            },
672            PredicateOperator::GreaterThan => match (datum.data_type(), datum.literal()) {
673                (PrimitiveType::Int, PrimitiveLiteral::Int(v)) => Some(Datum::int(v + 1)),
674                (PrimitiveType::Long, PrimitiveLiteral::Long(v)) => Some(Datum::long(v + 1)),
675                (PrimitiveType::Decimal { .. }, PrimitiveLiteral::Int128(v)) => {
676                    Some(Datum::decimal(decimal_from_i128_with_scale(v + 1, 0))?)
677                }
678                (PrimitiveType::Date, PrimitiveLiteral::Int(v)) => Some(Datum::date(v + 1)),
679                (PrimitiveType::Timestamp, PrimitiveLiteral::Long(v)) => {
680                    Some(Datum::timestamp_micros(v + 1))
681                }
682                _ => Some(datum.to_owned()),
683            },
684            PredicateOperator::Eq
685            | PredicateOperator::LessThanOrEq
686            | PredicateOperator::GreaterThanOrEq
687            | PredicateOperator::StartsWith
688            | PredicateOperator::NotStartsWith => Some(datum.to_owned()),
689            _ => None,
690        };
691
692        Ok(adjusted_boundary)
693    }
694
695    /// Adjusts the comparison operator based on the specified datum and an
696    /// optional width constraint.
697    ///
698    /// This function modifies the comparison operator for `LessThan` and
699    /// `GreaterThan` cases to their inclusive counterparts (`LessThanOrEq`,
700    /// `GreaterThanOrEq`) unconditionally. For `StartsWith` and
701    /// `NotStartsWith` operators acting on string literals, the operator may
702    /// be adjusted to `Eq` or `NotEq` if the string length matches the
703    /// specified width, indicating a precise match rather than a prefix
704    /// condition.
705    fn adjust_operator(
706        op: &PredicateOperator,
707        datum: &Datum,
708        width: Option<u32>,
709    ) -> Option<PredicateOperator> {
710        match op {
711            PredicateOperator::LessThan => Some(PredicateOperator::LessThanOrEq),
712            PredicateOperator::GreaterThan => Some(PredicateOperator::GreaterThanOrEq),
713            PredicateOperator::StartsWith => match datum.literal() {
714                PrimitiveLiteral::String(s) => {
715                    if let Some(w) = width
716                        && s.len() == w as usize
717                    {
718                        return Some(PredicateOperator::Eq);
719                    };
720                    Some(*op)
721                }
722                _ => Some(*op),
723            },
724            PredicateOperator::NotStartsWith => match datum.literal() {
725                PrimitiveLiteral::String(s) => {
726                    if let Some(w) = width {
727                        let w = w as usize;
728
729                        if s.len() == w {
730                            return Some(PredicateOperator::NotEq);
731                        }
732
733                        if s.len() < w {
734                            return Some(*op);
735                        }
736
737                        return None;
738                    };
739                    Some(*op)
740                }
741                _ => Some(*op),
742            },
743            _ => Some(*op),
744        }
745    }
746
747    /// Adjust projection for temporal transforms, align with Java
748    /// implementation: https://github.com/apache/iceberg/blob/main/api/src/main/java/org/apache/iceberg/transforms/ProjectionUtil.java#L275
749    fn adjust_time_projection(
750        &self,
751        op: &PredicateOperator,
752        original: &Datum,
753        transformed: &Datum,
754    ) -> Option<AdjustedProjection> {
755        let should_adjust = match self {
756            Transform::Day => matches!(original.data_type(), PrimitiveType::Timestamp),
757            Transform::Year | Transform::Month => true,
758            _ => false,
759        };
760
761        if should_adjust && let &PrimitiveLiteral::Int(v) = transformed.literal() {
762            match op {
763                PredicateOperator::LessThan
764                | PredicateOperator::LessThanOrEq
765                | PredicateOperator::In => {
766                    if v < 0 {
767                        // # TODO
768                        // An ugly hack to fix. Refine the increment and decrement logic later.
769                        match self {
770                            Transform::Day => {
771                                return Some(AdjustedProjection::Single(Datum::date(v + 1)));
772                            }
773                            _ => {
774                                return Some(AdjustedProjection::Single(Datum::int(v + 1)));
775                            }
776                        }
777                    };
778                }
779                PredicateOperator::Eq => {
780                    if v < 0 {
781                        let new_set = FnvHashSet::from_iter(vec![
782                            transformed.to_owned(),
783                            // # TODO
784                            // An ugly hack to fix. Refine the increment and decrement logic later.
785                            {
786                                match self {
787                                    Transform::Day => Datum::date(v + 1),
788                                    _ => Datum::int(v + 1),
789                                }
790                            },
791                        ]);
792                        return Some(AdjustedProjection::Set(new_set));
793                    }
794                }
795                _ => {
796                    return None;
797                }
798            }
799        };
800        None
801    }
802
803    // Increment for Int, Long, Decimal, Date, Timestamp
804    // Ignore other types
805    #[inline]
806    fn try_increment_number(datum: &Datum) -> Result<Datum> {
807        match (datum.data_type(), datum.literal()) {
808            (PrimitiveType::Int, PrimitiveLiteral::Int(v)) => Ok(Datum::int(v + 1)),
809            (PrimitiveType::Long, PrimitiveLiteral::Long(v)) => Ok(Datum::long(v + 1)),
810            (PrimitiveType::Decimal { .. }, PrimitiveLiteral::Int128(v)) => {
811                Datum::decimal(decimal_from_i128_with_scale(v + 1, 0))
812            }
813            (PrimitiveType::Date, PrimitiveLiteral::Int(v)) => Ok(Datum::date(v + 1)),
814            (PrimitiveType::Timestamp, PrimitiveLiteral::Long(v)) => {
815                Ok(Datum::timestamp_micros(v + 1))
816            }
817            (PrimitiveType::TimestampNs, PrimitiveLiteral::Long(v)) => {
818                Ok(Datum::timestamp_nanos(v + 1))
819            }
820            (PrimitiveType::Timestamptz, PrimitiveLiteral::Long(v)) => {
821                Ok(Datum::timestamptz_micros(v + 1))
822            }
823            (PrimitiveType::TimestamptzNs, PrimitiveLiteral::Long(v)) => {
824                Ok(Datum::timestamptz_nanos(v + 1))
825            }
826            (PrimitiveType::Int, _)
827            | (PrimitiveType::Long, _)
828            | (PrimitiveType::Decimal { .. }, _)
829            | (PrimitiveType::Date, _)
830            | (PrimitiveType::Timestamp, _) => Err(Error::new(
831                ErrorKind::Unexpected,
832                format!(
833                    "Unsupported literal increment for type: {:?}",
834                    datum.data_type()
835                ),
836            )),
837            _ => Ok(datum.to_owned()),
838        }
839    }
840
841    // Decrement for Int, Long, Decimal, Date, Timestamp
842    // Ignore other types
843    #[inline]
844    fn try_decrement_number(datum: &Datum) -> Result<Datum> {
845        match (datum.data_type(), datum.literal()) {
846            (PrimitiveType::Int, PrimitiveLiteral::Int(v)) => Ok(Datum::int(v - 1)),
847            (PrimitiveType::Long, PrimitiveLiteral::Long(v)) => Ok(Datum::long(v - 1)),
848            (PrimitiveType::Decimal { .. }, PrimitiveLiteral::Int128(v)) => {
849                Datum::decimal(decimal_from_i128_with_scale(v - 1, 0))
850            }
851            (PrimitiveType::Date, PrimitiveLiteral::Int(v)) => Ok(Datum::date(v - 1)),
852            (PrimitiveType::Timestamp, PrimitiveLiteral::Long(v)) => {
853                Ok(Datum::timestamp_micros(v - 1))
854            }
855            (PrimitiveType::TimestampNs, PrimitiveLiteral::Long(v)) => {
856                Ok(Datum::timestamp_nanos(v - 1))
857            }
858            (PrimitiveType::Timestamptz, PrimitiveLiteral::Long(v)) => {
859                Ok(Datum::timestamptz_micros(v - 1))
860            }
861            (PrimitiveType::TimestamptzNs, PrimitiveLiteral::Long(v)) => {
862                Ok(Datum::timestamptz_nanos(v - 1))
863            }
864            (PrimitiveType::Int, _)
865            | (PrimitiveType::Long, _)
866            | (PrimitiveType::Decimal { .. }, _)
867            | (PrimitiveType::Date, _)
868            | (PrimitiveType::Timestamp, _) => Err(Error::new(
869                ErrorKind::Unexpected,
870                format!(
871                    "Unsupported literal decrement for type: {:?}",
872                    datum.data_type()
873                ),
874            )),
875            _ => Ok(datum.to_owned()),
876        }
877    }
878
879    fn truncate_number_strict(
880        &self,
881        name: &str,
882        expr: &BinaryExpression<BoundReference>,
883        func: &BoxedTransformFunction,
884    ) -> Result<Option<Predicate>> {
885        let boundary = expr.literal();
886
887        if !matches!(
888            boundary.data_type(),
889            &PrimitiveType::Int
890                | &PrimitiveType::Long
891                | &PrimitiveType::Decimal { .. }
892                | &PrimitiveType::Date
893                | &PrimitiveType::Timestamp
894                | &PrimitiveType::Timestamptz
895                | &PrimitiveType::TimestampNs
896                | &PrimitiveType::TimestamptzNs
897        ) {
898            return Err(Error::new(
899                ErrorKind::DataInvalid,
900                format!("Expected a numeric literal, got: {boundary:?}"),
901            ));
902        }
903
904        let predicate = match expr.op() {
905            PredicateOperator::LessThan => Some(Predicate::Binary(BinaryExpression::new(
906                PredicateOperator::LessThan,
907                Reference::new(name),
908                func.transform_literal_result(boundary)?,
909            ))),
910            PredicateOperator::LessThanOrEq => Some(Predicate::Binary(BinaryExpression::new(
911                PredicateOperator::LessThan,
912                Reference::new(name),
913                func.transform_literal_result(&Self::try_increment_number(boundary)?)?,
914            ))),
915            PredicateOperator::GreaterThan => Some(Predicate::Binary(BinaryExpression::new(
916                PredicateOperator::GreaterThan,
917                Reference::new(name),
918                func.transform_literal_result(boundary)?,
919            ))),
920            PredicateOperator::GreaterThanOrEq => Some(Predicate::Binary(BinaryExpression::new(
921                PredicateOperator::GreaterThan,
922                Reference::new(name),
923                func.transform_literal_result(&Self::try_decrement_number(boundary)?)?,
924            ))),
925            PredicateOperator::NotEq => Some(Predicate::Binary(BinaryExpression::new(
926                PredicateOperator::NotEq,
927                Reference::new(name),
928                func.transform_literal_result(boundary)?,
929            ))),
930            _ => None,
931        };
932
933        Ok(predicate)
934    }
935
936    fn truncate_array_strict(
937        &self,
938        name: &str,
939        expr: &BinaryExpression<BoundReference>,
940        func: &BoxedTransformFunction,
941    ) -> Result<Option<Predicate>> {
942        let boundary = expr.literal();
943
944        match expr.op() {
945            PredicateOperator::LessThan | PredicateOperator::LessThanOrEq => {
946                Ok(Some(Predicate::Binary(BinaryExpression::new(
947                    PredicateOperator::LessThan,
948                    Reference::new(name),
949                    func.transform_literal_result(boundary)?,
950                ))))
951            }
952            PredicateOperator::GreaterThan | PredicateOperator::GreaterThanOrEq => {
953                Ok(Some(Predicate::Binary(BinaryExpression::new(
954                    PredicateOperator::GreaterThan,
955                    Reference::new(name),
956                    func.transform_literal_result(boundary)?,
957                ))))
958            }
959            PredicateOperator::NotEq => Ok(Some(Predicate::Binary(BinaryExpression::new(
960                PredicateOperator::NotEq,
961                Reference::new(name),
962                func.transform_literal_result(boundary)?,
963            )))),
964            _ => Ok(None),
965        }
966    }
967}
968
969impl Display for Transform {
970    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
971        match self {
972            Transform::Identity => write!(f, "identity"),
973            Transform::Year => write!(f, "year"),
974            Transform::Month => write!(f, "month"),
975            Transform::Day => write!(f, "day"),
976            Transform::Hour => write!(f, "hour"),
977            Transform::Void => write!(f, "void"),
978            Transform::Bucket(length) => write!(f, "bucket[{length}]"),
979            Transform::Truncate(width) => write!(f, "truncate[{width}]"),
980            Transform::Unknown => write!(f, "unknown"),
981        }
982    }
983}
984
985impl FromStr for Transform {
986    type Err = Error;
987
988    fn from_str(s: &str) -> Result<Self> {
989        let t = match s {
990            "identity" => Transform::Identity,
991            "year" => Transform::Year,
992            "month" => Transform::Month,
993            "day" => Transform::Day,
994            "hour" => Transform::Hour,
995            "void" => Transform::Void,
996            "unknown" => Transform::Unknown,
997            v if v.starts_with("bucket") => {
998                let length = v
999                    .strip_prefix("bucket")
1000                    .expect("transform must starts with `bucket`")
1001                    .trim_start_matches('[')
1002                    .trim_end_matches(']')
1003                    .parse()
1004                    .map_err(|err| {
1005                        Error::new(
1006                            ErrorKind::DataInvalid,
1007                            format!("transform bucket type {v:?} is invalid"),
1008                        )
1009                        .with_source(err)
1010                    })?;
1011
1012                Transform::Bucket(length)
1013            }
1014            v if v.starts_with("truncate") => {
1015                let width = v
1016                    .strip_prefix("truncate")
1017                    .expect("transform must starts with `truncate`")
1018                    .trim_start_matches('[')
1019                    .trim_end_matches(']')
1020                    .parse()
1021                    .map_err(|err| {
1022                        Error::new(
1023                            ErrorKind::DataInvalid,
1024                            format!("transform truncate type {v:?} is invalid"),
1025                        )
1026                        .with_source(err)
1027                    })?;
1028
1029                Transform::Truncate(width)
1030            }
1031            v => {
1032                return Err(Error::new(
1033                    ErrorKind::DataInvalid,
1034                    format!("transform {v:?} is invalid"),
1035                ));
1036            }
1037        };
1038
1039        Ok(t)
1040    }
1041}
1042
1043impl Serialize for Transform {
1044    fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
1045    where S: Serializer {
1046        serializer.serialize_str(format!("{self}").as_str())
1047    }
1048}
1049
1050impl<'de> Deserialize<'de> for Transform {
1051    fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
1052    where D: Deserializer<'de> {
1053        let s = String::deserialize(deserializer)?;
1054        s.parse().map_err(<D::Error as serde::de::Error>::custom)
1055    }
1056}
1057
1058/// An enum representing the result of the adjusted projection.
1059/// Either being a single adjusted datum or a set.
1060#[derive(Debug)]
1061enum AdjustedProjection {
1062    Single(Datum),
1063    Set(FnvHashSet<Datum>),
1064}