iceberg/spec/
transform.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Transforms in iceberg.
19
20use std::cmp::Ordering;
21use std::fmt::{Display, Formatter};
22use std::str::FromStr;
23
24use fnv::FnvHashSet;
25use serde::{Deserialize, Deserializer, Serialize, Serializer};
26
27use super::{Datum, PrimitiveLiteral};
28use crate::ErrorKind;
29use crate::error::{Error, Result};
30use crate::expr::{
31    BinaryExpression, BoundPredicate, BoundReference, Predicate, PredicateOperator, Reference,
32    SetExpression, UnaryExpression,
33};
34use crate::spec::Literal;
35use crate::spec::datatypes::{PrimitiveType, Type};
36use crate::transform::{BoxedTransformFunction, create_transform_function};
37
38/// Transform is used to transform predicates to partition predicates,
39/// in addition to transforming data values.
40///
41/// Deriving partition predicates from column predicates on the table data
42/// is used to separate the logical queries from physical storage: the
43/// partitioning can change and the correct partition filters are always
44/// derived from column predicates.
45///
46/// This simplifies queries because users don’t have to supply both logical
47/// predicates and partition predicates.
48///
49/// All transforms must return `null` for a `null` input value.
50#[derive(Debug, PartialEq, Eq, Clone, Copy)]
51pub enum Transform {
52    /// Source value, unmodified
53    ///
54    /// - Source type could be any type.
55    /// - Return type is the same with source type.
56    Identity,
57    /// Hash of value, mod `N`.
58    ///
59    /// Bucket partition transforms use a 32-bit hash of the source value.
60    /// The 32-bit hash implementation is the 32-bit Murmur3 hash, x86
61    /// variant, seeded with 0.
62    ///
63    /// Transforms are parameterized by a number of buckets, N. The hash mod
64    /// N must produce a positive value by first discarding the sign bit of
65    /// the hash value. In pseudo-code, the function is:
66    ///
67    /// ```text
68    /// def bucket_N(x) = (murmur3_x86_32_hash(x) & Integer.MAX_VALUE) % N
69    /// ```
70    ///
71    /// - Source type could be `int`, `long`, `decimal`, `date`, `time`,
72    ///   `timestamp`, `timestamptz`, `string`, `uuid`, `fixed`, `binary`.
73    /// - Return type is `int`.
74    Bucket(u32),
75    /// Value truncated to width `W`
76    ///
77    /// For `int`:
78    ///
79    /// - `v - (v % W)` remainders must be positive
80    /// - example: W=10: 1 → 0, -1 → -10
81    /// - note: The remainder, v % W, must be positive.
82    ///
83    /// For `long`:
84    ///
85    /// - `v - (v % W)` remainders must be positive
86    /// - example: W=10: 1 → 0, -1 → -10
87    /// - note: The remainder, v % W, must be positive.
88    ///
89    /// For `decimal`:
90    ///
91    /// - `scaled_W = decimal(W, scale(v)) v - (v % scaled_W)`
92    /// - example: W=50, s=2: 10.65 → 10.50
93    ///
94    /// For `string`:
95    ///
96    /// - Substring of length L: `v.substring(0, L)`
97    /// - example: L=3: iceberg → ice
98    /// - note: Strings are truncated to a valid UTF-8 string with no more
99    ///   than L code points.
100    ///
101    /// - Source type could be `int`, `long`, `decimal`, `string`
102    /// - Return type is the same with source type.
103    Truncate(u32),
104    /// Extract a date or timestamp year, as years from 1970
105    ///
106    /// - Source type could be `date`, `timestamp`, `timestamptz`
107    /// - Return type is `int`
108    Year,
109    /// Extract a date or timestamp month, as months from 1970-01-01
110    ///
111    /// - Source type could be `date`, `timestamp`, `timestamptz`
112    /// - Return type is `int`
113    Month,
114    /// Extract a date or timestamp day, as days from 1970-01-01
115    ///
116    /// - Source type could be `date`, `timestamp`, `timestamptz`
117    /// - Return type is `int`
118    Day,
119    /// Extract a timestamp hour, as hours from 1970-01-01 00:00:00
120    ///
121    /// - Source type could be `timestamp`, `timestamptz`
122    /// - Return type is `int`
123    Hour,
124    /// Always produces `null`
125    ///
126    /// The void transform may be used to replace the transform in an
127    /// existing partition field so that the field is effectively dropped in
128    /// v1 tables.
129    ///
130    /// - Source type could be any type..
131    /// - Return type is Source type.
132    Void,
133    /// Used to represent some customized transform that can't be recognized or supported now.
134    Unknown,
135}
136
137impl Transform {
138    /// Returns a human-readable String representation of a transformed value.
139    pub fn to_human_string(&self, field_type: &Type, value: Option<&Literal>) -> String {
140        let Some(value) = value else {
141            return "null".to_string();
142        };
143
144        if let Some(value) = value.as_primitive_literal() {
145            let field_type = field_type.as_primitive_type().unwrap();
146            let datum = Datum::new(field_type.clone(), value);
147
148            match self {
149                Self::Void => "null".to_string(),
150                _ => datum.to_human_string(),
151            }
152        } else {
153            "null".to_string()
154        }
155    }
156
157    /// Get the return type of transform given the input type.
158    /// Returns `None` if it can't be transformed.
159    pub fn result_type(&self, input_type: &Type) -> Result<Type> {
160        match self {
161            Transform::Identity => {
162                if matches!(input_type, Type::Primitive(_)) {
163                    Ok(input_type.clone())
164                } else {
165                    Err(Error::new(
166                        ErrorKind::DataInvalid,
167                        format!("{input_type} is not a valid input type of identity transform",),
168                    ))
169                }
170            }
171            Transform::Void => Ok(input_type.clone()),
172            Transform::Unknown => Ok(Type::Primitive(PrimitiveType::String)),
173            Transform::Bucket(_) => {
174                if let Type::Primitive(p) = input_type {
175                    match p {
176                        PrimitiveType::Int
177                        | PrimitiveType::Long
178                        | PrimitiveType::Decimal { .. }
179                        | PrimitiveType::Date
180                        | PrimitiveType::Time
181                        | PrimitiveType::Timestamp
182                        | PrimitiveType::Timestamptz
183                        | PrimitiveType::TimestampNs
184                        | PrimitiveType::TimestamptzNs
185                        | PrimitiveType::String
186                        | PrimitiveType::Uuid
187                        | PrimitiveType::Fixed(_)
188                        | PrimitiveType::Binary => Ok(Type::Primitive(PrimitiveType::Int)),
189                        _ => Err(Error::new(
190                            ErrorKind::DataInvalid,
191                            format!("{input_type} is not a valid input type of bucket transform",),
192                        )),
193                    }
194                } else {
195                    Err(Error::new(
196                        ErrorKind::DataInvalid,
197                        format!("{input_type} is not a valid input type of bucket transform",),
198                    ))
199                }
200            }
201            Transform::Truncate(_) => {
202                if let Type::Primitive(p) = input_type {
203                    match p {
204                        PrimitiveType::Int
205                        | PrimitiveType::Long
206                        | PrimitiveType::String
207                        | PrimitiveType::Binary
208                        | PrimitiveType::Decimal { .. } => Ok(input_type.clone()),
209                        _ => Err(Error::new(
210                            ErrorKind::DataInvalid,
211                            format!("{input_type} is not a valid input type of truncate transform",),
212                        )),
213                    }
214                } else {
215                    Err(Error::new(
216                        ErrorKind::DataInvalid,
217                        format!("{input_type} is not a valid input type of truncate transform",),
218                    ))
219                }
220            }
221            Transform::Year | Transform::Month => {
222                if let Type::Primitive(p) = input_type {
223                    match p {
224                        PrimitiveType::Timestamp
225                        | PrimitiveType::Timestamptz
226                        | PrimitiveType::TimestampNs
227                        | PrimitiveType::TimestamptzNs
228                        | PrimitiveType::Date => Ok(Type::Primitive(PrimitiveType::Int)),
229                        _ => Err(Error::new(
230                            ErrorKind::DataInvalid,
231                            format!("{input_type} is not a valid input type of {self} transform",),
232                        )),
233                    }
234                } else {
235                    Err(Error::new(
236                        ErrorKind::DataInvalid,
237                        format!("{input_type} is not a valid input type of {self} transform",),
238                    ))
239                }
240            }
241            Transform::Day => {
242                if let Type::Primitive(p) = input_type {
243                    match p {
244                        PrimitiveType::Timestamp
245                        | PrimitiveType::Timestamptz
246                        | PrimitiveType::TimestampNs
247                        | PrimitiveType::TimestamptzNs
248                        | PrimitiveType::Date => Ok(Type::Primitive(PrimitiveType::Date)),
249                        _ => Err(Error::new(
250                            ErrorKind::DataInvalid,
251                            format!("{input_type} is not a valid input type of {self} transform",),
252                        )),
253                    }
254                } else {
255                    Err(Error::new(
256                        ErrorKind::DataInvalid,
257                        format!("{input_type} is not a valid input type of {self} transform",),
258                    ))
259                }
260            }
261            Transform::Hour => {
262                if let Type::Primitive(p) = input_type {
263                    match p {
264                        PrimitiveType::Timestamp
265                        | PrimitiveType::Timestamptz
266                        | PrimitiveType::TimestampNs
267                        | PrimitiveType::TimestamptzNs => Ok(Type::Primitive(PrimitiveType::Int)),
268                        _ => Err(Error::new(
269                            ErrorKind::DataInvalid,
270                            format!("{input_type} is not a valid input type of {self} transform",),
271                        )),
272                    }
273                } else {
274                    Err(Error::new(
275                        ErrorKind::DataInvalid,
276                        format!("{input_type} is not a valid input type of {self} transform",),
277                    ))
278                }
279            }
280        }
281    }
282
283    /// Whether the transform preserves the order of values.
284    pub fn preserves_order(&self) -> bool {
285        !matches!(
286            self,
287            Transform::Void | Transform::Bucket(_) | Transform::Unknown
288        )
289    }
290
291    /// Return the unique transform name to check if similar transforms for the same source field
292    /// are added multiple times in partition spec builder.
293    pub fn dedup_name(&self) -> String {
294        match self {
295            Transform::Year | Transform::Month | Transform::Day | Transform::Hour => {
296                "time".to_string()
297            }
298            _ => format!("{self}"),
299        }
300    }
301
302    /// Whether ordering by this transform's result satisfies the ordering of another transform's
303    /// result.
304    ///
305    /// For example, sorting by day(ts) will produce an ordering that is also by month(ts) or
306    ///  year(ts). However, sorting by day(ts) will not satisfy the order of hour(ts) or identity(ts).
307    pub fn satisfies_order_of(&self, other: &Self) -> bool {
308        match self {
309            Transform::Identity => other.preserves_order(),
310            Transform::Hour => matches!(
311                other,
312                Transform::Hour | Transform::Day | Transform::Month | Transform::Year
313            ),
314            Transform::Day => matches!(other, Transform::Day | Transform::Month | Transform::Year),
315            Transform::Month => matches!(other, Transform::Month | Transform::Year),
316            _ => self == other,
317        }
318    }
319
320    /// Strictly projects a given predicate according to the transformation
321    /// specified by the `Transform` instance.
322    ///
323    /// This method ensures that the projected predicate is strictly aligned
324    /// with the transformation logic, providing a more precise filtering
325    /// mechanism for transformed data.
326    ///
327    /// # Example
328    /// Suppose, we have row filter `a = 10`, and a partition spec
329    /// `bucket(a, 37) as bs`, if one row matches `a = 10`, then its partition
330    /// value should match `bucket(10, 37) as bs`, and we project `a = 10` to
331    /// `bs = bucket(10, 37)`
332    pub fn strict_project(
333        &self,
334        name: &str,
335        predicate: &BoundPredicate,
336    ) -> Result<Option<Predicate>> {
337        let func = create_transform_function(self)?;
338
339        match self {
340            Transform::Identity => match predicate {
341                BoundPredicate::Unary(expr) => Self::project_unary(expr.op(), name),
342                BoundPredicate::Binary(expr) => Ok(Some(Predicate::Binary(BinaryExpression::new(
343                    expr.op(),
344                    Reference::new(name),
345                    expr.literal().to_owned(),
346                )))),
347                BoundPredicate::Set(expr) => Ok(Some(Predicate::Set(SetExpression::new(
348                    expr.op(),
349                    Reference::new(name),
350                    expr.literals().to_owned(),
351                )))),
352                _ => Ok(None),
353            },
354            Transform::Bucket(_) => match predicate {
355                BoundPredicate::Unary(expr) => Self::project_unary(expr.op(), name),
356                BoundPredicate::Binary(expr) => {
357                    self.project_binary_expr(name, PredicateOperator::NotEq, expr, &func)
358                }
359                BoundPredicate::Set(expr) => {
360                    self.project_set_expr(expr, PredicateOperator::NotIn, name, &func)
361                }
362                _ => Ok(None),
363            },
364            Transform::Truncate(width) => match predicate {
365                BoundPredicate::Unary(expr) => Self::project_unary(expr.op(), name),
366                BoundPredicate::Binary(expr) => {
367                    if matches!(
368                        expr.term().field().field_type.as_primitive_type(),
369                        Some(&PrimitiveType::Int)
370                            | Some(&PrimitiveType::Long)
371                            | Some(&PrimitiveType::Decimal { .. })
372                    ) {
373                        self.truncate_number_strict(name, expr, &func)
374                    } else if expr.op() == PredicateOperator::StartsWith {
375                        let len = match expr.literal().literal() {
376                            PrimitiveLiteral::String(s) => s.len(),
377                            PrimitiveLiteral::Binary(b) => b.len(),
378                            _ => {
379                                return Err(Error::new(
380                                    ErrorKind::DataInvalid,
381                                    format!(
382                                        "Expected a string or binary literal, got: {:?}",
383                                        expr.literal()
384                                    ),
385                                ));
386                            }
387                        };
388                        match len.cmp(&(*width as usize)) {
389                            Ordering::Less => Ok(Some(Predicate::Binary(BinaryExpression::new(
390                                PredicateOperator::StartsWith,
391                                Reference::new(name),
392                                expr.literal().to_owned(),
393                            )))),
394                            Ordering::Equal => Ok(Some(Predicate::Binary(BinaryExpression::new(
395                                PredicateOperator::Eq,
396                                Reference::new(name),
397                                expr.literal().to_owned(),
398                            )))),
399                            Ordering::Greater => Ok(None),
400                        }
401                    } else if expr.op() == PredicateOperator::NotStartsWith {
402                        let len = match expr.literal().literal() {
403                            PrimitiveLiteral::String(s) => s.len(),
404                            PrimitiveLiteral::Binary(b) => b.len(),
405                            _ => {
406                                return Err(Error::new(
407                                    ErrorKind::DataInvalid,
408                                    format!(
409                                        "Expected a string or binary literal, got: {:?}",
410                                        expr.literal()
411                                    ),
412                                ));
413                            }
414                        };
415                        match len.cmp(&(*width as usize)) {
416                            Ordering::Less => Ok(Some(Predicate::Binary(BinaryExpression::new(
417                                PredicateOperator::NotStartsWith,
418                                Reference::new(name),
419                                expr.literal().to_owned(),
420                            )))),
421                            Ordering::Equal => Ok(Some(Predicate::Binary(BinaryExpression::new(
422                                PredicateOperator::NotEq,
423                                Reference::new(name),
424                                expr.literal().to_owned(),
425                            )))),
426                            Ordering::Greater => {
427                                Ok(Some(Predicate::Binary(BinaryExpression::new(
428                                    expr.op(),
429                                    Reference::new(name),
430                                    func.transform_literal_result(expr.literal())?,
431                                ))))
432                            }
433                        }
434                    } else {
435                        self.truncate_array_strict(name, expr, &func)
436                    }
437                }
438                BoundPredicate::Set(expr) => {
439                    self.project_set_expr(expr, PredicateOperator::NotIn, name, &func)
440                }
441                _ => Ok(None),
442            },
443            Transform::Year | Transform::Month | Transform::Day | Transform::Hour => {
444                match predicate {
445                    BoundPredicate::Unary(expr) => Self::project_unary(expr.op(), name),
446                    BoundPredicate::Binary(expr) => self.truncate_number_strict(name, expr, &func),
447                    BoundPredicate::Set(expr) => {
448                        self.project_set_expr(expr, PredicateOperator::NotIn, name, &func)
449                    }
450                    _ => Ok(None),
451                }
452            }
453            _ => Ok(None),
454        }
455    }
456
457    /// Projects a given predicate according to the transformation
458    /// specified by the `Transform` instance.
459    ///
460    /// This allows predicates to be effectively applied to data
461    /// that has undergone transformation, enabling efficient querying
462    /// and filtering based on the original, untransformed data.
463    ///
464    /// # Example
465    /// Suppose, we have row filter `a = 10`, and a partition spec
466    /// `bucket(a, 37) as bs`, if one row matches `a = 10`, then its partition
467    /// value should match `bucket(10, 37) as bs`, and we project `a = 10` to
468    /// `bs = bucket(10, 37)`
469    pub fn project(&self, name: &str, predicate: &BoundPredicate) -> Result<Option<Predicate>> {
470        let func = create_transform_function(self)?;
471
472        match self {
473            Transform::Identity => match predicate {
474                BoundPredicate::Unary(expr) => Self::project_unary(expr.op(), name),
475                BoundPredicate::Binary(expr) => Ok(Some(Predicate::Binary(BinaryExpression::new(
476                    expr.op(),
477                    Reference::new(name),
478                    expr.literal().to_owned(),
479                )))),
480                BoundPredicate::Set(expr) => Ok(Some(Predicate::Set(SetExpression::new(
481                    expr.op(),
482                    Reference::new(name),
483                    expr.literals().to_owned(),
484                )))),
485                _ => Ok(None),
486            },
487            Transform::Bucket(_) => match predicate {
488                BoundPredicate::Unary(expr) => Self::project_unary(expr.op(), name),
489                BoundPredicate::Binary(expr) => {
490                    self.project_binary_expr(name, PredicateOperator::Eq, expr, &func)
491                }
492                BoundPredicate::Set(expr) => {
493                    self.project_set_expr(expr, PredicateOperator::In, name, &func)
494                }
495                _ => Ok(None),
496            },
497            Transform::Truncate(width) => match predicate {
498                BoundPredicate::Unary(expr) => Self::project_unary(expr.op(), name),
499                BoundPredicate::Binary(expr) => {
500                    self.project_binary_with_adjusted_boundary(name, expr, &func, Some(*width))
501                }
502                BoundPredicate::Set(expr) => {
503                    self.project_set_expr(expr, PredicateOperator::In, name, &func)
504                }
505                _ => Ok(None),
506            },
507            Transform::Year | Transform::Month | Transform::Day | Transform::Hour => {
508                match predicate {
509                    BoundPredicate::Unary(expr) => Self::project_unary(expr.op(), name),
510                    BoundPredicate::Binary(expr) => {
511                        self.project_binary_with_adjusted_boundary(name, expr, &func, None)
512                    }
513                    BoundPredicate::Set(expr) => {
514                        self.project_set_expr(expr, PredicateOperator::In, name, &func)
515                    }
516                    _ => Ok(None),
517                }
518            }
519            _ => Ok(None),
520        }
521    }
522
523    /// Check if `Transform` is applicable on datum's `PrimitiveType`
524    fn can_transform(&self, datum: &Datum) -> bool {
525        let input_type = datum.data_type().clone();
526        self.result_type(&Type::Primitive(input_type)).is_ok()
527    }
528
529    /// Creates a unary predicate from a given operator and a reference name.
530    fn project_unary(op: PredicateOperator, name: &str) -> Result<Option<Predicate>> {
531        Ok(Some(Predicate::Unary(UnaryExpression::new(
532            op,
533            Reference::new(name),
534        ))))
535    }
536
537    /// Attempts to create a binary predicate based on a binary expression,
538    /// if applicable.
539    ///
540    /// This method evaluates a given binary expression and, if the operation
541    /// is the given operator and the literal can be transformed, constructs a
542    /// `Predicate::Binary`variant representing the binary operation.
543    fn project_binary_expr(
544        &self,
545        name: &str,
546        op: PredicateOperator,
547        expr: &BinaryExpression<BoundReference>,
548        func: &BoxedTransformFunction,
549    ) -> Result<Option<Predicate>> {
550        if expr.op() != op || !self.can_transform(expr.literal()) {
551            return Ok(None);
552        }
553
554        Ok(Some(Predicate::Binary(BinaryExpression::new(
555            expr.op(),
556            Reference::new(name),
557            func.transform_literal_result(expr.literal())?,
558        ))))
559    }
560
561    /// Projects a binary expression to a predicate with an adjusted boundary.
562    ///
563    /// Checks if the literal within the given binary expression is
564    /// transformable. If transformable, it proceeds to potentially adjust
565    /// the boundary of the expression based on the comparison operator (`op`).
566    /// The potential adjustments involve incrementing or decrementing the
567    /// literal value and changing the `PredicateOperator` itself to its
568    /// inclusive variant.
569    fn project_binary_with_adjusted_boundary(
570        &self,
571        name: &str,
572        expr: &BinaryExpression<BoundReference>,
573        func: &BoxedTransformFunction,
574        width: Option<u32>,
575    ) -> Result<Option<Predicate>> {
576        if !self.can_transform(expr.literal()) {
577            return Ok(None);
578        }
579
580        let op = &expr.op();
581        let datum = &expr.literal();
582
583        if let Some(boundary) = Self::adjust_boundary(op, datum)? {
584            let transformed_projection = func.transform_literal_result(&boundary)?;
585
586            let adjusted_projection =
587                self.adjust_time_projection(op, datum, &transformed_projection);
588
589            let adjusted_operator = Self::adjust_operator(op, datum, width);
590
591            if let Some(op) = adjusted_operator {
592                let predicate = match adjusted_projection {
593                    None => Predicate::Binary(BinaryExpression::new(
594                        op,
595                        Reference::new(name),
596                        transformed_projection,
597                    )),
598                    Some(AdjustedProjection::Single(d)) => {
599                        Predicate::Binary(BinaryExpression::new(op, Reference::new(name), d))
600                    }
601                    Some(AdjustedProjection::Set(d)) => Predicate::Set(SetExpression::new(
602                        PredicateOperator::In,
603                        Reference::new(name),
604                        d,
605                    )),
606                };
607                return Ok(Some(predicate));
608            }
609        };
610
611        Ok(None)
612    }
613
614    /// Projects a set expression to a predicate,
615    /// applying a transformation to each literal in the set.
616    fn project_set_expr(
617        &self,
618        expr: &SetExpression<BoundReference>,
619        op: PredicateOperator,
620        name: &str,
621        func: &BoxedTransformFunction,
622    ) -> Result<Option<Predicate>> {
623        if expr.op() != op || expr.literals().iter().any(|d| !self.can_transform(d)) {
624            return Ok(None);
625        }
626
627        let mut new_set = FnvHashSet::default();
628
629        for lit in expr.literals() {
630            let datum = func.transform_literal_result(lit)?;
631
632            if let Some(AdjustedProjection::Single(d)) =
633                self.adjust_time_projection(&op, lit, &datum)
634            {
635                new_set.insert(d);
636            };
637
638            new_set.insert(datum);
639        }
640
641        Ok(Some(Predicate::Set(SetExpression::new(
642            expr.op(),
643            Reference::new(name),
644            new_set,
645        ))))
646    }
647
648    /// Adjusts the boundary value for comparison operations
649    /// based on the specified `PredicateOperator` and `Datum`.
650    ///
651    /// This function modifies the boundary value for certain comparison
652    /// operators (`LessThan`, `GreaterThan`) by incrementing or decrementing
653    /// the literal value within the given `Datum`. For operators that do not
654    /// imply a boundary shift (`Eq`, `LessThanOrEq`, `GreaterThanOrEq`,
655    /// `StartsWith`, `NotStartsWith`), the original datum is returned
656    /// unmodified.
657    fn adjust_boundary(op: &PredicateOperator, datum: &Datum) -> Result<Option<Datum>> {
658        let adjusted_boundary = match op {
659            PredicateOperator::LessThan => match (datum.data_type(), datum.literal()) {
660                (PrimitiveType::Int, PrimitiveLiteral::Int(v)) => Some(Datum::int(v - 1)),
661                (PrimitiveType::Long, PrimitiveLiteral::Long(v)) => Some(Datum::long(v - 1)),
662                (PrimitiveType::Decimal { .. }, PrimitiveLiteral::Int128(v)) => {
663                    Some(Datum::decimal(v - 1)?)
664                }
665                (PrimitiveType::Date, PrimitiveLiteral::Int(v)) => Some(Datum::date(v - 1)),
666                (PrimitiveType::Timestamp, PrimitiveLiteral::Long(v)) => {
667                    Some(Datum::timestamp_micros(v - 1))
668                }
669                _ => Some(datum.to_owned()),
670            },
671            PredicateOperator::GreaterThan => match (datum.data_type(), datum.literal()) {
672                (PrimitiveType::Int, PrimitiveLiteral::Int(v)) => Some(Datum::int(v + 1)),
673                (PrimitiveType::Long, PrimitiveLiteral::Long(v)) => Some(Datum::long(v + 1)),
674                (PrimitiveType::Decimal { .. }, PrimitiveLiteral::Int128(v)) => {
675                    Some(Datum::decimal(v + 1)?)
676                }
677                (PrimitiveType::Date, PrimitiveLiteral::Int(v)) => Some(Datum::date(v + 1)),
678                (PrimitiveType::Timestamp, PrimitiveLiteral::Long(v)) => {
679                    Some(Datum::timestamp_micros(v + 1))
680                }
681                _ => Some(datum.to_owned()),
682            },
683            PredicateOperator::Eq
684            | PredicateOperator::LessThanOrEq
685            | PredicateOperator::GreaterThanOrEq
686            | PredicateOperator::StartsWith
687            | PredicateOperator::NotStartsWith => Some(datum.to_owned()),
688            _ => None,
689        };
690
691        Ok(adjusted_boundary)
692    }
693
694    /// Adjusts the comparison operator based on the specified datum and an
695    /// optional width constraint.
696    ///
697    /// This function modifies the comparison operator for `LessThan` and
698    /// `GreaterThan` cases to their inclusive counterparts (`LessThanOrEq`,
699    /// `GreaterThanOrEq`) unconditionally. For `StartsWith` and
700    /// `NotStartsWith` operators acting on string literals, the operator may
701    /// be adjusted to `Eq` or `NotEq` if the string length matches the
702    /// specified width, indicating a precise match rather than a prefix
703    /// condition.
704    fn adjust_operator(
705        op: &PredicateOperator,
706        datum: &Datum,
707        width: Option<u32>,
708    ) -> Option<PredicateOperator> {
709        match op {
710            PredicateOperator::LessThan => Some(PredicateOperator::LessThanOrEq),
711            PredicateOperator::GreaterThan => Some(PredicateOperator::GreaterThanOrEq),
712            PredicateOperator::StartsWith => match datum.literal() {
713                PrimitiveLiteral::String(s) => {
714                    if let Some(w) = width {
715                        if s.len() == w as usize {
716                            return Some(PredicateOperator::Eq);
717                        };
718                    };
719                    Some(*op)
720                }
721                _ => Some(*op),
722            },
723            PredicateOperator::NotStartsWith => match datum.literal() {
724                PrimitiveLiteral::String(s) => {
725                    if let Some(w) = width {
726                        let w = w as usize;
727
728                        if s.len() == w {
729                            return Some(PredicateOperator::NotEq);
730                        }
731
732                        if s.len() < w {
733                            return Some(*op);
734                        }
735
736                        return None;
737                    };
738                    Some(*op)
739                }
740                _ => Some(*op),
741            },
742            _ => Some(*op),
743        }
744    }
745
746    /// Adjust projection for temporal transforms, align with Java
747    /// implementation: https://github.com/apache/iceberg/blob/main/api/src/main/java/org/apache/iceberg/transforms/ProjectionUtil.java#L275
748    fn adjust_time_projection(
749        &self,
750        op: &PredicateOperator,
751        original: &Datum,
752        transformed: &Datum,
753    ) -> Option<AdjustedProjection> {
754        let should_adjust = match self {
755            Transform::Day => matches!(original.data_type(), PrimitiveType::Timestamp),
756            Transform::Year | Transform::Month => true,
757            _ => false,
758        };
759
760        if should_adjust {
761            if let &PrimitiveLiteral::Int(v) = transformed.literal() {
762                match op {
763                    PredicateOperator::LessThan
764                    | PredicateOperator::LessThanOrEq
765                    | PredicateOperator::In => {
766                        if v < 0 {
767                            // # TODO
768                            // An ugly hack to fix. Refine the increment and decrement logic later.
769                            match self {
770                                Transform::Day => {
771                                    return Some(AdjustedProjection::Single(Datum::date(v + 1)));
772                                }
773                                _ => {
774                                    return Some(AdjustedProjection::Single(Datum::int(v + 1)));
775                                }
776                            }
777                        };
778                    }
779                    PredicateOperator::Eq => {
780                        if v < 0 {
781                            let new_set = FnvHashSet::from_iter(vec![
782                                transformed.to_owned(),
783                                // # TODO
784                                // An ugly hack to fix. Refine the increment and decrement logic later.
785                                {
786                                    match self {
787                                        Transform::Day => Datum::date(v + 1),
788                                        _ => Datum::int(v + 1),
789                                    }
790                                },
791                            ]);
792                            return Some(AdjustedProjection::Set(new_set));
793                        }
794                    }
795                    _ => {
796                        return None;
797                    }
798                }
799            };
800        }
801        None
802    }
803
804    // Increment for Int, Long, Decimal, Date, Timestamp
805    // Ignore other types
806    #[inline]
807    fn try_increment_number(datum: &Datum) -> Result<Datum> {
808        match (datum.data_type(), datum.literal()) {
809            (PrimitiveType::Int, PrimitiveLiteral::Int(v)) => Ok(Datum::int(v + 1)),
810            (PrimitiveType::Long, PrimitiveLiteral::Long(v)) => Ok(Datum::long(v + 1)),
811            (PrimitiveType::Decimal { .. }, PrimitiveLiteral::Int128(v)) => Datum::decimal(v + 1),
812            (PrimitiveType::Date, PrimitiveLiteral::Int(v)) => Ok(Datum::date(v + 1)),
813            (PrimitiveType::Timestamp, PrimitiveLiteral::Long(v)) => {
814                Ok(Datum::timestamp_micros(v + 1))
815            }
816            (PrimitiveType::TimestampNs, PrimitiveLiteral::Long(v)) => {
817                Ok(Datum::timestamp_nanos(v + 1))
818            }
819            (PrimitiveType::Timestamptz, PrimitiveLiteral::Long(v)) => {
820                Ok(Datum::timestamptz_micros(v + 1))
821            }
822            (PrimitiveType::TimestamptzNs, PrimitiveLiteral::Long(v)) => {
823                Ok(Datum::timestamptz_nanos(v + 1))
824            }
825            (PrimitiveType::Int, _)
826            | (PrimitiveType::Long, _)
827            | (PrimitiveType::Decimal { .. }, _)
828            | (PrimitiveType::Date, _)
829            | (PrimitiveType::Timestamp, _) => Err(Error::new(
830                ErrorKind::Unexpected,
831                format!(
832                    "Unsupported literal increment for type: {:?}",
833                    datum.data_type()
834                ),
835            )),
836            _ => Ok(datum.to_owned()),
837        }
838    }
839
840    // Decrement for Int, Long, Decimal, Date, Timestamp
841    // Ignore other types
842    #[inline]
843    fn try_decrement_number(datum: &Datum) -> Result<Datum> {
844        match (datum.data_type(), datum.literal()) {
845            (PrimitiveType::Int, PrimitiveLiteral::Int(v)) => Ok(Datum::int(v - 1)),
846            (PrimitiveType::Long, PrimitiveLiteral::Long(v)) => Ok(Datum::long(v - 1)),
847            (PrimitiveType::Decimal { .. }, PrimitiveLiteral::Int128(v)) => Datum::decimal(v - 1),
848            (PrimitiveType::Date, PrimitiveLiteral::Int(v)) => Ok(Datum::date(v - 1)),
849            (PrimitiveType::Timestamp, PrimitiveLiteral::Long(v)) => {
850                Ok(Datum::timestamp_micros(v - 1))
851            }
852            (PrimitiveType::TimestampNs, PrimitiveLiteral::Long(v)) => {
853                Ok(Datum::timestamp_nanos(v - 1))
854            }
855            (PrimitiveType::Timestamptz, PrimitiveLiteral::Long(v)) => {
856                Ok(Datum::timestamptz_micros(v - 1))
857            }
858            (PrimitiveType::TimestamptzNs, PrimitiveLiteral::Long(v)) => {
859                Ok(Datum::timestamptz_nanos(v - 1))
860            }
861            (PrimitiveType::Int, _)
862            | (PrimitiveType::Long, _)
863            | (PrimitiveType::Decimal { .. }, _)
864            | (PrimitiveType::Date, _)
865            | (PrimitiveType::Timestamp, _) => Err(Error::new(
866                ErrorKind::Unexpected,
867                format!(
868                    "Unsupported literal decrement for type: {:?}",
869                    datum.data_type()
870                ),
871            )),
872            _ => Ok(datum.to_owned()),
873        }
874    }
875
876    fn truncate_number_strict(
877        &self,
878        name: &str,
879        expr: &BinaryExpression<BoundReference>,
880        func: &BoxedTransformFunction,
881    ) -> Result<Option<Predicate>> {
882        let boundary = expr.literal();
883
884        if !matches!(
885            boundary.data_type(),
886            &PrimitiveType::Int
887                | &PrimitiveType::Long
888                | &PrimitiveType::Decimal { .. }
889                | &PrimitiveType::Date
890                | &PrimitiveType::Timestamp
891                | &PrimitiveType::Timestamptz
892                | &PrimitiveType::TimestampNs
893                | &PrimitiveType::TimestamptzNs
894        ) {
895            return Err(Error::new(
896                ErrorKind::DataInvalid,
897                format!("Expected a numeric literal, got: {boundary:?}"),
898            ));
899        }
900
901        let predicate = match expr.op() {
902            PredicateOperator::LessThan => Some(Predicate::Binary(BinaryExpression::new(
903                PredicateOperator::LessThan,
904                Reference::new(name),
905                func.transform_literal_result(boundary)?,
906            ))),
907            PredicateOperator::LessThanOrEq => Some(Predicate::Binary(BinaryExpression::new(
908                PredicateOperator::LessThan,
909                Reference::new(name),
910                func.transform_literal_result(&Self::try_increment_number(boundary)?)?,
911            ))),
912            PredicateOperator::GreaterThan => Some(Predicate::Binary(BinaryExpression::new(
913                PredicateOperator::GreaterThan,
914                Reference::new(name),
915                func.transform_literal_result(boundary)?,
916            ))),
917            PredicateOperator::GreaterThanOrEq => Some(Predicate::Binary(BinaryExpression::new(
918                PredicateOperator::GreaterThan,
919                Reference::new(name),
920                func.transform_literal_result(&Self::try_decrement_number(boundary)?)?,
921            ))),
922            PredicateOperator::NotEq => Some(Predicate::Binary(BinaryExpression::new(
923                PredicateOperator::NotEq,
924                Reference::new(name),
925                func.transform_literal_result(boundary)?,
926            ))),
927            _ => None,
928        };
929
930        Ok(predicate)
931    }
932
933    fn truncate_array_strict(
934        &self,
935        name: &str,
936        expr: &BinaryExpression<BoundReference>,
937        func: &BoxedTransformFunction,
938    ) -> Result<Option<Predicate>> {
939        let boundary = expr.literal();
940
941        match expr.op() {
942            PredicateOperator::LessThan | PredicateOperator::LessThanOrEq => {
943                Ok(Some(Predicate::Binary(BinaryExpression::new(
944                    PredicateOperator::LessThan,
945                    Reference::new(name),
946                    func.transform_literal_result(boundary)?,
947                ))))
948            }
949            PredicateOperator::GreaterThan | PredicateOperator::GreaterThanOrEq => {
950                Ok(Some(Predicate::Binary(BinaryExpression::new(
951                    PredicateOperator::GreaterThan,
952                    Reference::new(name),
953                    func.transform_literal_result(boundary)?,
954                ))))
955            }
956            PredicateOperator::NotEq => Ok(Some(Predicate::Binary(BinaryExpression::new(
957                PredicateOperator::NotEq,
958                Reference::new(name),
959                func.transform_literal_result(boundary)?,
960            )))),
961            _ => Ok(None),
962        }
963    }
964}
965
966impl Display for Transform {
967    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
968        match self {
969            Transform::Identity => write!(f, "identity"),
970            Transform::Year => write!(f, "year"),
971            Transform::Month => write!(f, "month"),
972            Transform::Day => write!(f, "day"),
973            Transform::Hour => write!(f, "hour"),
974            Transform::Void => write!(f, "void"),
975            Transform::Bucket(length) => write!(f, "bucket[{length}]"),
976            Transform::Truncate(width) => write!(f, "truncate[{width}]"),
977            Transform::Unknown => write!(f, "unknown"),
978        }
979    }
980}
981
982impl FromStr for Transform {
983    type Err = Error;
984
985    fn from_str(s: &str) -> Result<Self> {
986        let t = match s {
987            "identity" => Transform::Identity,
988            "year" => Transform::Year,
989            "month" => Transform::Month,
990            "day" => Transform::Day,
991            "hour" => Transform::Hour,
992            "void" => Transform::Void,
993            "unknown" => Transform::Unknown,
994            v if v.starts_with("bucket") => {
995                let length = v
996                    .strip_prefix("bucket")
997                    .expect("transform must starts with `bucket`")
998                    .trim_start_matches('[')
999                    .trim_end_matches(']')
1000                    .parse()
1001                    .map_err(|err| {
1002                        Error::new(
1003                            ErrorKind::DataInvalid,
1004                            format!("transform bucket type {v:?} is invalid"),
1005                        )
1006                        .with_source(err)
1007                    })?;
1008
1009                Transform::Bucket(length)
1010            }
1011            v if v.starts_with("truncate") => {
1012                let width = v
1013                    .strip_prefix("truncate")
1014                    .expect("transform must starts with `truncate`")
1015                    .trim_start_matches('[')
1016                    .trim_end_matches(']')
1017                    .parse()
1018                    .map_err(|err| {
1019                        Error::new(
1020                            ErrorKind::DataInvalid,
1021                            format!("transform truncate type {v:?} is invalid"),
1022                        )
1023                        .with_source(err)
1024                    })?;
1025
1026                Transform::Truncate(width)
1027            }
1028            v => {
1029                return Err(Error::new(
1030                    ErrorKind::DataInvalid,
1031                    format!("transform {v:?} is invalid"),
1032                ));
1033            }
1034        };
1035
1036        Ok(t)
1037    }
1038}
1039
1040impl Serialize for Transform {
1041    fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
1042    where S: Serializer {
1043        serializer.serialize_str(format!("{self}").as_str())
1044    }
1045}
1046
1047impl<'de> Deserialize<'de> for Transform {
1048    fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
1049    where D: Deserializer<'de> {
1050        let s = String::deserialize(deserializer)?;
1051        s.parse().map_err(<D::Error as serde::de::Error>::custom)
1052    }
1053}
1054
1055/// An enum representing the result of the adjusted projection.
1056/// Either being a single adjusted datum or a set.
1057#[derive(Debug)]
1058enum AdjustedProjection {
1059    Single(Datum),
1060    Set(FnvHashSet<Datum>),
1061}