iceberg_datafusion/physical_plan/
expr_to_predicate.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::vec;
19
20use datafusion::arrow::datatypes::DataType;
21use datafusion::logical_expr::{Expr, Operator};
22use datafusion::scalar::ScalarValue;
23use iceberg::expr::{BinaryExpression, Predicate, PredicateOperator, Reference, UnaryExpression};
24use iceberg::spec::Datum;
25
26// A datafusion expression could be an Iceberg predicate, column, or literal.
27enum TransformedResult {
28    Predicate(Predicate),
29    Column(Reference),
30    Literal(Datum),
31    NotTransformed,
32}
33
34enum OpTransformedResult {
35    Operator(PredicateOperator),
36    And,
37    Or,
38    NotTransformed,
39}
40
41/// Converts DataFusion filters ([`Expr`]) to an iceberg [`Predicate`].
42/// If none of the filters could be converted, return `None` which adds no predicates to the scan operation.
43/// If the conversion was successful, return the converted predicates combined with an AND operator.
44pub fn convert_filters_to_predicate(filters: &[Expr]) -> Option<Predicate> {
45    filters
46        .iter()
47        .filter_map(convert_filter_to_predicate)
48        .reduce(Predicate::and)
49}
50
51fn convert_filter_to_predicate(expr: &Expr) -> Option<Predicate> {
52    match to_iceberg_predicate(expr) {
53        TransformedResult::Predicate(predicate) => Some(predicate),
54        TransformedResult::Column(_) | TransformedResult::Literal(_) => {
55            unreachable!("Not a valid expression: {:?}", expr)
56        }
57        _ => None,
58    }
59}
60
61fn to_iceberg_predicate(expr: &Expr) -> TransformedResult {
62    match expr {
63        Expr::BinaryExpr(binary) => {
64            let left = to_iceberg_predicate(&binary.left);
65            let right = to_iceberg_predicate(&binary.right);
66            let op = to_iceberg_operation(binary.op);
67            match op {
68                OpTransformedResult::Operator(op) => to_iceberg_binary_predicate(left, right, op),
69                OpTransformedResult::And => to_iceberg_and_predicate(left, right),
70                OpTransformedResult::Or => to_iceberg_or_predicate(left, right),
71                OpTransformedResult::NotTransformed => TransformedResult::NotTransformed,
72            }
73        }
74        Expr::Not(exp) => {
75            let expr = to_iceberg_predicate(exp);
76            match expr {
77                TransformedResult::Predicate(p) => TransformedResult::Predicate(!p),
78                _ => TransformedResult::NotTransformed,
79            }
80        }
81        Expr::Column(column) => TransformedResult::Column(Reference::new(column.name())),
82        Expr::Literal(literal, _) => match scalar_value_to_datum(literal) {
83            Some(data) => TransformedResult::Literal(data),
84            None => TransformedResult::NotTransformed,
85        },
86        Expr::InList(inlist) => {
87            let mut datums = vec![];
88            for expr in &inlist.list {
89                let p = to_iceberg_predicate(expr);
90                match p {
91                    TransformedResult::Literal(l) => datums.push(l),
92                    _ => return TransformedResult::NotTransformed,
93                }
94            }
95
96            let expr = to_iceberg_predicate(&inlist.expr);
97            match expr {
98                TransformedResult::Column(r) => match inlist.negated {
99                    false => TransformedResult::Predicate(r.is_in(datums)),
100                    true => TransformedResult::Predicate(r.is_not_in(datums)),
101                },
102                _ => TransformedResult::NotTransformed,
103            }
104        }
105        Expr::IsNull(expr) => {
106            let p = to_iceberg_predicate(expr);
107            match p {
108                TransformedResult::Column(r) => TransformedResult::Predicate(Predicate::Unary(
109                    UnaryExpression::new(PredicateOperator::IsNull, r),
110                )),
111                _ => TransformedResult::NotTransformed,
112            }
113        }
114        Expr::IsNotNull(expr) => {
115            let p = to_iceberg_predicate(expr);
116            match p {
117                TransformedResult::Column(r) => TransformedResult::Predicate(Predicate::Unary(
118                    UnaryExpression::new(PredicateOperator::NotNull, r),
119                )),
120                _ => TransformedResult::NotTransformed,
121            }
122        }
123        Expr::Cast(c) => {
124            if c.data_type == DataType::Date32 || c.data_type == DataType::Date64 {
125                // Casts to date truncate the expression, we cannot simply extract it as it
126                // can create erroneous predicates.
127                return TransformedResult::NotTransformed;
128            }
129            to_iceberg_predicate(&c.expr)
130        }
131        _ => TransformedResult::NotTransformed,
132    }
133}
134
135fn to_iceberg_operation(op: Operator) -> OpTransformedResult {
136    match op {
137        Operator::Eq => OpTransformedResult::Operator(PredicateOperator::Eq),
138        Operator::NotEq => OpTransformedResult::Operator(PredicateOperator::NotEq),
139        Operator::Lt => OpTransformedResult::Operator(PredicateOperator::LessThan),
140        Operator::LtEq => OpTransformedResult::Operator(PredicateOperator::LessThanOrEq),
141        Operator::Gt => OpTransformedResult::Operator(PredicateOperator::GreaterThan),
142        Operator::GtEq => OpTransformedResult::Operator(PredicateOperator::GreaterThanOrEq),
143        // AND OR
144        Operator::And => OpTransformedResult::And,
145        Operator::Or => OpTransformedResult::Or,
146        // Others not supported
147        _ => OpTransformedResult::NotTransformed,
148    }
149}
150
151fn to_iceberg_and_predicate(
152    left: TransformedResult,
153    right: TransformedResult,
154) -> TransformedResult {
155    match (left, right) {
156        (TransformedResult::Predicate(left), TransformedResult::Predicate(right)) => {
157            TransformedResult::Predicate(left.and(right))
158        }
159        (TransformedResult::Predicate(left), _) => TransformedResult::Predicate(left),
160        (_, TransformedResult::Predicate(right)) => TransformedResult::Predicate(right),
161        _ => TransformedResult::NotTransformed,
162    }
163}
164
165fn to_iceberg_or_predicate(left: TransformedResult, right: TransformedResult) -> TransformedResult {
166    match (left, right) {
167        (TransformedResult::Predicate(left), TransformedResult::Predicate(right)) => {
168            TransformedResult::Predicate(left.or(right))
169        }
170        _ => TransformedResult::NotTransformed,
171    }
172}
173
174fn to_iceberg_binary_predicate(
175    left: TransformedResult,
176    right: TransformedResult,
177    op: PredicateOperator,
178) -> TransformedResult {
179    let (r, d, op) = match (left, right) {
180        (TransformedResult::NotTransformed, _) => return TransformedResult::NotTransformed,
181        (_, TransformedResult::NotTransformed) => return TransformedResult::NotTransformed,
182        (TransformedResult::Column(r), TransformedResult::Literal(d)) => (r, d, op),
183        (TransformedResult::Literal(d), TransformedResult::Column(r)) => {
184            (r, d, reverse_predicate_operator(op))
185        }
186        _ => return TransformedResult::NotTransformed,
187    };
188    TransformedResult::Predicate(Predicate::Binary(BinaryExpression::new(op, r, d)))
189}
190
191fn reverse_predicate_operator(op: PredicateOperator) -> PredicateOperator {
192    match op {
193        PredicateOperator::Eq => PredicateOperator::Eq,
194        PredicateOperator::NotEq => PredicateOperator::NotEq,
195        PredicateOperator::GreaterThan => PredicateOperator::LessThan,
196        PredicateOperator::GreaterThanOrEq => PredicateOperator::LessThanOrEq,
197        PredicateOperator::LessThan => PredicateOperator::GreaterThan,
198        PredicateOperator::LessThanOrEq => PredicateOperator::GreaterThanOrEq,
199        _ => unreachable!("Reverse {}", op),
200    }
201}
202
203const MILLIS_PER_DAY: i64 = 24 * 60 * 60 * 1000;
204/// Convert a scalar value to an iceberg datum.
205fn scalar_value_to_datum(value: &ScalarValue) -> Option<Datum> {
206    match value {
207        ScalarValue::Int8(Some(v)) => Some(Datum::int(*v as i32)),
208        ScalarValue::Int16(Some(v)) => Some(Datum::int(*v as i32)),
209        ScalarValue::Int32(Some(v)) => Some(Datum::int(*v)),
210        ScalarValue::Int64(Some(v)) => Some(Datum::long(*v)),
211        ScalarValue::Float32(Some(v)) => Some(Datum::double(*v as f64)),
212        ScalarValue::Float64(Some(v)) => Some(Datum::double(*v)),
213        ScalarValue::Utf8(Some(v)) => Some(Datum::string(v.clone())),
214        ScalarValue::LargeUtf8(Some(v)) => Some(Datum::string(v.clone())),
215        ScalarValue::Date32(Some(v)) => Some(Datum::date(*v)),
216        ScalarValue::Date64(Some(v)) => Some(Datum::date((*v / MILLIS_PER_DAY) as i32)),
217        _ => None,
218    }
219}
220
221#[cfg(test)]
222mod tests {
223    use std::collections::HashMap;
224
225    use datafusion::arrow::datatypes::{DataType, Field, Schema, TimeUnit};
226    use datafusion::common::DFSchema;
227    use datafusion::logical_expr::utils::split_conjunction;
228    use datafusion::prelude::{Expr, SessionContext};
229    use iceberg::expr::{Predicate, Reference};
230    use iceberg::spec::Datum;
231    use parquet::arrow::PARQUET_FIELD_ID_META_KEY;
232
233    use super::convert_filters_to_predicate;
234
235    fn create_test_schema() -> DFSchema {
236        let arrow_schema = Schema::new(vec![
237            Field::new("foo", DataType::Int32, true).with_metadata(HashMap::from([(
238                PARQUET_FIELD_ID_META_KEY.to_string(),
239                "1".to_string(),
240            )])),
241            Field::new("bar", DataType::Utf8, true).with_metadata(HashMap::from([(
242                PARQUET_FIELD_ID_META_KEY.to_string(),
243                "2".to_string(),
244            )])),
245            Field::new("ts", DataType::Timestamp(TimeUnit::Second, None), true).with_metadata(
246                HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "3".to_string())]),
247            ),
248        ]);
249        DFSchema::try_from_qualified_schema("my_table", &arrow_schema).unwrap()
250    }
251
252    fn convert_to_iceberg_predicate(sql: &str) -> Option<Predicate> {
253        let df_schema = create_test_schema();
254        let expr = SessionContext::new()
255            .parse_sql_expr(sql, &df_schema)
256            .unwrap();
257        let exprs: Vec<Expr> = split_conjunction(&expr).into_iter().cloned().collect();
258        convert_filters_to_predicate(&exprs[..])
259    }
260
261    #[test]
262    fn test_predicate_conversion_with_single_condition() {
263        let predicate = convert_to_iceberg_predicate("foo = 1").unwrap();
264        assert_eq!(predicate, Reference::new("foo").equal_to(Datum::long(1)));
265
266        let predicate = convert_to_iceberg_predicate("foo != 1").unwrap();
267        assert_eq!(
268            predicate,
269            Reference::new("foo").not_equal_to(Datum::long(1))
270        );
271
272        let predicate = convert_to_iceberg_predicate("foo > 1").unwrap();
273        assert_eq!(
274            predicate,
275            Reference::new("foo").greater_than(Datum::long(1))
276        );
277
278        let predicate = convert_to_iceberg_predicate("foo >= 1").unwrap();
279        assert_eq!(
280            predicate,
281            Reference::new("foo").greater_than_or_equal_to(Datum::long(1))
282        );
283
284        let predicate = convert_to_iceberg_predicate("foo < 1").unwrap();
285        assert_eq!(predicate, Reference::new("foo").less_than(Datum::long(1)));
286
287        let predicate = convert_to_iceberg_predicate("foo <= 1").unwrap();
288        assert_eq!(
289            predicate,
290            Reference::new("foo").less_than_or_equal_to(Datum::long(1))
291        );
292
293        let predicate = convert_to_iceberg_predicate("foo is null").unwrap();
294        assert_eq!(predicate, Reference::new("foo").is_null());
295
296        let predicate = convert_to_iceberg_predicate("foo is not null").unwrap();
297        assert_eq!(predicate, Reference::new("foo").is_not_null());
298
299        let predicate = convert_to_iceberg_predicate("foo in (5, 6)").unwrap();
300        assert_eq!(
301            predicate,
302            Reference::new("foo").is_in([Datum::long(5), Datum::long(6)])
303        );
304
305        let predicate = convert_to_iceberg_predicate("foo not in (5, 6)").unwrap();
306        assert_eq!(
307            predicate,
308            Reference::new("foo").is_not_in([Datum::long(5), Datum::long(6)])
309        );
310
311        let predicate = convert_to_iceberg_predicate("not foo = 1").unwrap();
312        assert_eq!(predicate, !Reference::new("foo").equal_to(Datum::long(1)));
313    }
314
315    #[test]
316    fn test_predicate_conversion_with_single_unsupported_condition() {
317        let predicate = convert_to_iceberg_predicate("foo + 1 = 1");
318        assert_eq!(predicate, None);
319
320        let predicate = convert_to_iceberg_predicate("length(bar) = 1");
321        assert_eq!(predicate, None);
322
323        let predicate = convert_to_iceberg_predicate("foo in (1, 2, foo)");
324        assert_eq!(predicate, None);
325    }
326
327    #[test]
328    fn test_predicate_conversion_with_single_condition_rev() {
329        let predicate = convert_to_iceberg_predicate("1 < foo").unwrap();
330        assert_eq!(
331            predicate,
332            Reference::new("foo").greater_than(Datum::long(1))
333        );
334    }
335
336    #[test]
337    fn test_predicate_conversion_with_and_condition() {
338        let sql = "foo > 1 and bar = 'test'";
339        let predicate = convert_to_iceberg_predicate(sql).unwrap();
340        let expected_predicate = Predicate::and(
341            Reference::new("foo").greater_than(Datum::long(1)),
342            Reference::new("bar").equal_to(Datum::string("test")),
343        );
344        assert_eq!(predicate, expected_predicate);
345    }
346
347    #[test]
348    fn test_predicate_conversion_with_and_condition_unsupported() {
349        let sql = "foo > 1 and length(bar) = 1";
350        let predicate = convert_to_iceberg_predicate(sql).unwrap();
351        let expected_predicate = Reference::new("foo").greater_than(Datum::long(1));
352        assert_eq!(predicate, expected_predicate);
353    }
354
355    #[test]
356    fn test_predicate_conversion_with_and_condition_both_unsupported() {
357        let sql = "foo in (1, 2, foo) and length(bar) = 1";
358        let predicate = convert_to_iceberg_predicate(sql);
359        assert_eq!(predicate, None);
360    }
361
362    #[test]
363    fn test_predicate_conversion_with_or_condition_unsupported() {
364        let sql = "foo > 1 or length(bar) = 1";
365        let predicate = convert_to_iceberg_predicate(sql);
366        assert_eq!(predicate, None);
367    }
368
369    #[test]
370    fn test_predicate_conversion_with_or_condition_supported() {
371        let sql = "foo > 1 or bar = 'test'";
372        let predicate = convert_to_iceberg_predicate(sql).unwrap();
373        let expected_predicate = Predicate::or(
374            Reference::new("foo").greater_than(Datum::long(1)),
375            Reference::new("bar").equal_to(Datum::string("test")),
376        );
377        assert_eq!(predicate, expected_predicate);
378    }
379
380    #[test]
381    fn test_predicate_conversion_with_complex_binary_expr() {
382        let sql = "(foo > 1 and bar = 'test') or foo < 0 ";
383        let predicate = convert_to_iceberg_predicate(sql).unwrap();
384
385        let inner_predicate = Predicate::and(
386            Reference::new("foo").greater_than(Datum::long(1)),
387            Reference::new("bar").equal_to(Datum::string("test")),
388        );
389        let expected_predicate = Predicate::or(
390            inner_predicate,
391            Reference::new("foo").less_than(Datum::long(0)),
392        );
393        assert_eq!(predicate, expected_predicate);
394    }
395
396    #[test]
397    fn test_predicate_conversion_with_one_and_expr_supported() {
398        let sql = "(foo > 1 and length(bar) = 1 ) or foo < 0 ";
399        let predicate = convert_to_iceberg_predicate(sql).unwrap();
400
401        let inner_predicate = Reference::new("foo").greater_than(Datum::long(1));
402        let expected_predicate = Predicate::or(
403            inner_predicate,
404            Reference::new("foo").less_than(Datum::long(0)),
405        );
406        assert_eq!(predicate, expected_predicate);
407    }
408
409    #[test]
410    fn test_predicate_conversion_with_complex_binary_expr_unsupported() {
411        let sql = "(foo > 1 or length(bar) = 1 ) and foo < 0 ";
412        let predicate = convert_to_iceberg_predicate(sql).unwrap();
413        let expected_predicate = Reference::new("foo").less_than(Datum::long(0));
414        assert_eq!(predicate, expected_predicate);
415    }
416
417    #[test]
418    fn test_predicate_conversion_with_cast() {
419        let sql = "ts >= timestamp '2023-01-05T00:00:00'";
420        let predicate = convert_to_iceberg_predicate(sql).unwrap();
421        let expected_predicate =
422            Reference::new("ts").greater_than_or_equal_to(Datum::string("2023-01-05T00:00:00"));
423        assert_eq!(predicate, expected_predicate);
424    }
425
426    #[test]
427    fn test_predicate_conversion_with_date_cast() {
428        let sql = "ts >= date '2023-01-05T11:00:00'";
429        let predicate = convert_to_iceberg_predicate(sql);
430        assert_eq!(predicate, None);
431    }
432}