1use std::vec;
19
20use datafusion::arrow::datatypes::DataType;
21use datafusion::logical_expr::{Expr, Operator};
22use datafusion::scalar::ScalarValue;
23use iceberg::expr::{BinaryExpression, Predicate, PredicateOperator, Reference, UnaryExpression};
24use iceberg::spec::Datum;
25
26enum TransformedResult {
28 Predicate(Predicate),
29 Column(Reference),
30 Literal(Datum),
31 NotTransformed,
32}
33
34enum OpTransformedResult {
35 Operator(PredicateOperator),
36 And,
37 Or,
38 NotTransformed,
39}
40
41pub fn convert_filters_to_predicate(filters: &[Expr]) -> Option<Predicate> {
45 filters
46 .iter()
47 .filter_map(convert_filter_to_predicate)
48 .reduce(Predicate::and)
49}
50
51fn convert_filter_to_predicate(expr: &Expr) -> Option<Predicate> {
52 match to_iceberg_predicate(expr) {
53 TransformedResult::Predicate(predicate) => Some(predicate),
54 TransformedResult::Column(_) | TransformedResult::Literal(_) => {
55 unreachable!("Not a valid expression: {:?}", expr)
56 }
57 _ => None,
58 }
59}
60
61fn to_iceberg_predicate(expr: &Expr) -> TransformedResult {
62 match expr {
63 Expr::BinaryExpr(binary) => {
64 let left = to_iceberg_predicate(&binary.left);
65 let right = to_iceberg_predicate(&binary.right);
66 let op = to_iceberg_operation(binary.op);
67 match op {
68 OpTransformedResult::Operator(op) => to_iceberg_binary_predicate(left, right, op),
69 OpTransformedResult::And => to_iceberg_and_predicate(left, right),
70 OpTransformedResult::Or => to_iceberg_or_predicate(left, right),
71 OpTransformedResult::NotTransformed => TransformedResult::NotTransformed,
72 }
73 }
74 Expr::Not(exp) => {
75 let expr = to_iceberg_predicate(exp);
76 match expr {
77 TransformedResult::Predicate(p) => TransformedResult::Predicate(!p),
78 _ => TransformedResult::NotTransformed,
79 }
80 }
81 Expr::Column(column) => TransformedResult::Column(Reference::new(column.name())),
82 Expr::Literal(literal, _) => match scalar_value_to_datum(literal) {
83 Some(data) => TransformedResult::Literal(data),
84 None => TransformedResult::NotTransformed,
85 },
86 Expr::InList(inlist) => {
87 let mut datums = vec![];
88 for expr in &inlist.list {
89 let p = to_iceberg_predicate(expr);
90 match p {
91 TransformedResult::Literal(l) => datums.push(l),
92 _ => return TransformedResult::NotTransformed,
93 }
94 }
95
96 let expr = to_iceberg_predicate(&inlist.expr);
97 match expr {
98 TransformedResult::Column(r) => match inlist.negated {
99 false => TransformedResult::Predicate(r.is_in(datums)),
100 true => TransformedResult::Predicate(r.is_not_in(datums)),
101 },
102 _ => TransformedResult::NotTransformed,
103 }
104 }
105 Expr::IsNull(expr) => {
106 let p = to_iceberg_predicate(expr);
107 match p {
108 TransformedResult::Column(r) => TransformedResult::Predicate(Predicate::Unary(
109 UnaryExpression::new(PredicateOperator::IsNull, r),
110 )),
111 _ => TransformedResult::NotTransformed,
112 }
113 }
114 Expr::IsNotNull(expr) => {
115 let p = to_iceberg_predicate(expr);
116 match p {
117 TransformedResult::Column(r) => TransformedResult::Predicate(Predicate::Unary(
118 UnaryExpression::new(PredicateOperator::NotNull, r),
119 )),
120 _ => TransformedResult::NotTransformed,
121 }
122 }
123 Expr::Cast(c) => {
124 if c.data_type == DataType::Date32 || c.data_type == DataType::Date64 {
125 return TransformedResult::NotTransformed;
128 }
129 to_iceberg_predicate(&c.expr)
130 }
131 _ => TransformedResult::NotTransformed,
132 }
133}
134
135fn to_iceberg_operation(op: Operator) -> OpTransformedResult {
136 match op {
137 Operator::Eq => OpTransformedResult::Operator(PredicateOperator::Eq),
138 Operator::NotEq => OpTransformedResult::Operator(PredicateOperator::NotEq),
139 Operator::Lt => OpTransformedResult::Operator(PredicateOperator::LessThan),
140 Operator::LtEq => OpTransformedResult::Operator(PredicateOperator::LessThanOrEq),
141 Operator::Gt => OpTransformedResult::Operator(PredicateOperator::GreaterThan),
142 Operator::GtEq => OpTransformedResult::Operator(PredicateOperator::GreaterThanOrEq),
143 Operator::And => OpTransformedResult::And,
145 Operator::Or => OpTransformedResult::Or,
146 _ => OpTransformedResult::NotTransformed,
148 }
149}
150
151fn to_iceberg_and_predicate(
152 left: TransformedResult,
153 right: TransformedResult,
154) -> TransformedResult {
155 match (left, right) {
156 (TransformedResult::Predicate(left), TransformedResult::Predicate(right)) => {
157 TransformedResult::Predicate(left.and(right))
158 }
159 (TransformedResult::Predicate(left), _) => TransformedResult::Predicate(left),
160 (_, TransformedResult::Predicate(right)) => TransformedResult::Predicate(right),
161 _ => TransformedResult::NotTransformed,
162 }
163}
164
165fn to_iceberg_or_predicate(left: TransformedResult, right: TransformedResult) -> TransformedResult {
166 match (left, right) {
167 (TransformedResult::Predicate(left), TransformedResult::Predicate(right)) => {
168 TransformedResult::Predicate(left.or(right))
169 }
170 _ => TransformedResult::NotTransformed,
171 }
172}
173
174fn to_iceberg_binary_predicate(
175 left: TransformedResult,
176 right: TransformedResult,
177 op: PredicateOperator,
178) -> TransformedResult {
179 let (r, d, op) = match (left, right) {
180 (TransformedResult::NotTransformed, _) => return TransformedResult::NotTransformed,
181 (_, TransformedResult::NotTransformed) => return TransformedResult::NotTransformed,
182 (TransformedResult::Column(r), TransformedResult::Literal(d)) => (r, d, op),
183 (TransformedResult::Literal(d), TransformedResult::Column(r)) => {
184 (r, d, reverse_predicate_operator(op))
185 }
186 _ => return TransformedResult::NotTransformed,
187 };
188 TransformedResult::Predicate(Predicate::Binary(BinaryExpression::new(op, r, d)))
189}
190
191fn reverse_predicate_operator(op: PredicateOperator) -> PredicateOperator {
192 match op {
193 PredicateOperator::Eq => PredicateOperator::Eq,
194 PredicateOperator::NotEq => PredicateOperator::NotEq,
195 PredicateOperator::GreaterThan => PredicateOperator::LessThan,
196 PredicateOperator::GreaterThanOrEq => PredicateOperator::LessThanOrEq,
197 PredicateOperator::LessThan => PredicateOperator::GreaterThan,
198 PredicateOperator::LessThanOrEq => PredicateOperator::GreaterThanOrEq,
199 _ => unreachable!("Reverse {}", op),
200 }
201}
202
203const MILLIS_PER_DAY: i64 = 24 * 60 * 60 * 1000;
204fn scalar_value_to_datum(value: &ScalarValue) -> Option<Datum> {
206 match value {
207 ScalarValue::Int8(Some(v)) => Some(Datum::int(*v as i32)),
208 ScalarValue::Int16(Some(v)) => Some(Datum::int(*v as i32)),
209 ScalarValue::Int32(Some(v)) => Some(Datum::int(*v)),
210 ScalarValue::Int64(Some(v)) => Some(Datum::long(*v)),
211 ScalarValue::Float32(Some(v)) => Some(Datum::double(*v as f64)),
212 ScalarValue::Float64(Some(v)) => Some(Datum::double(*v)),
213 ScalarValue::Utf8(Some(v)) => Some(Datum::string(v.clone())),
214 ScalarValue::LargeUtf8(Some(v)) => Some(Datum::string(v.clone())),
215 ScalarValue::Date32(Some(v)) => Some(Datum::date(*v)),
216 ScalarValue::Date64(Some(v)) => Some(Datum::date((*v / MILLIS_PER_DAY) as i32)),
217 _ => None,
218 }
219}
220
221#[cfg(test)]
222mod tests {
223 use std::collections::HashMap;
224
225 use datafusion::arrow::datatypes::{DataType, Field, Schema, TimeUnit};
226 use datafusion::common::DFSchema;
227 use datafusion::logical_expr::utils::split_conjunction;
228 use datafusion::prelude::{Expr, SessionContext};
229 use iceberg::expr::{Predicate, Reference};
230 use iceberg::spec::Datum;
231 use parquet::arrow::PARQUET_FIELD_ID_META_KEY;
232
233 use super::convert_filters_to_predicate;
234
235 fn create_test_schema() -> DFSchema {
236 let arrow_schema = Schema::new(vec![
237 Field::new("foo", DataType::Int32, true).with_metadata(HashMap::from([(
238 PARQUET_FIELD_ID_META_KEY.to_string(),
239 "1".to_string(),
240 )])),
241 Field::new("bar", DataType::Utf8, true).with_metadata(HashMap::from([(
242 PARQUET_FIELD_ID_META_KEY.to_string(),
243 "2".to_string(),
244 )])),
245 Field::new("ts", DataType::Timestamp(TimeUnit::Second, None), true).with_metadata(
246 HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "3".to_string())]),
247 ),
248 ]);
249 DFSchema::try_from_qualified_schema("my_table", &arrow_schema).unwrap()
250 }
251
252 fn convert_to_iceberg_predicate(sql: &str) -> Option<Predicate> {
253 let df_schema = create_test_schema();
254 let expr = SessionContext::new()
255 .parse_sql_expr(sql, &df_schema)
256 .unwrap();
257 let exprs: Vec<Expr> = split_conjunction(&expr).into_iter().cloned().collect();
258 convert_filters_to_predicate(&exprs[..])
259 }
260
261 #[test]
262 fn test_predicate_conversion_with_single_condition() {
263 let predicate = convert_to_iceberg_predicate("foo = 1").unwrap();
264 assert_eq!(predicate, Reference::new("foo").equal_to(Datum::long(1)));
265
266 let predicate = convert_to_iceberg_predicate("foo != 1").unwrap();
267 assert_eq!(
268 predicate,
269 Reference::new("foo").not_equal_to(Datum::long(1))
270 );
271
272 let predicate = convert_to_iceberg_predicate("foo > 1").unwrap();
273 assert_eq!(
274 predicate,
275 Reference::new("foo").greater_than(Datum::long(1))
276 );
277
278 let predicate = convert_to_iceberg_predicate("foo >= 1").unwrap();
279 assert_eq!(
280 predicate,
281 Reference::new("foo").greater_than_or_equal_to(Datum::long(1))
282 );
283
284 let predicate = convert_to_iceberg_predicate("foo < 1").unwrap();
285 assert_eq!(predicate, Reference::new("foo").less_than(Datum::long(1)));
286
287 let predicate = convert_to_iceberg_predicate("foo <= 1").unwrap();
288 assert_eq!(
289 predicate,
290 Reference::new("foo").less_than_or_equal_to(Datum::long(1))
291 );
292
293 let predicate = convert_to_iceberg_predicate("foo is null").unwrap();
294 assert_eq!(predicate, Reference::new("foo").is_null());
295
296 let predicate = convert_to_iceberg_predicate("foo is not null").unwrap();
297 assert_eq!(predicate, Reference::new("foo").is_not_null());
298
299 let predicate = convert_to_iceberg_predicate("foo in (5, 6)").unwrap();
300 assert_eq!(
301 predicate,
302 Reference::new("foo").is_in([Datum::long(5), Datum::long(6)])
303 );
304
305 let predicate = convert_to_iceberg_predicate("foo not in (5, 6)").unwrap();
306 assert_eq!(
307 predicate,
308 Reference::new("foo").is_not_in([Datum::long(5), Datum::long(6)])
309 );
310
311 let predicate = convert_to_iceberg_predicate("not foo = 1").unwrap();
312 assert_eq!(predicate, !Reference::new("foo").equal_to(Datum::long(1)));
313 }
314
315 #[test]
316 fn test_predicate_conversion_with_single_unsupported_condition() {
317 let predicate = convert_to_iceberg_predicate("foo + 1 = 1");
318 assert_eq!(predicate, None);
319
320 let predicate = convert_to_iceberg_predicate("length(bar) = 1");
321 assert_eq!(predicate, None);
322
323 let predicate = convert_to_iceberg_predicate("foo in (1, 2, foo)");
324 assert_eq!(predicate, None);
325 }
326
327 #[test]
328 fn test_predicate_conversion_with_single_condition_rev() {
329 let predicate = convert_to_iceberg_predicate("1 < foo").unwrap();
330 assert_eq!(
331 predicate,
332 Reference::new("foo").greater_than(Datum::long(1))
333 );
334 }
335
336 #[test]
337 fn test_predicate_conversion_with_and_condition() {
338 let sql = "foo > 1 and bar = 'test'";
339 let predicate = convert_to_iceberg_predicate(sql).unwrap();
340 let expected_predicate = Predicate::and(
341 Reference::new("foo").greater_than(Datum::long(1)),
342 Reference::new("bar").equal_to(Datum::string("test")),
343 );
344 assert_eq!(predicate, expected_predicate);
345 }
346
347 #[test]
348 fn test_predicate_conversion_with_and_condition_unsupported() {
349 let sql = "foo > 1 and length(bar) = 1";
350 let predicate = convert_to_iceberg_predicate(sql).unwrap();
351 let expected_predicate = Reference::new("foo").greater_than(Datum::long(1));
352 assert_eq!(predicate, expected_predicate);
353 }
354
355 #[test]
356 fn test_predicate_conversion_with_and_condition_both_unsupported() {
357 let sql = "foo in (1, 2, foo) and length(bar) = 1";
358 let predicate = convert_to_iceberg_predicate(sql);
359 assert_eq!(predicate, None);
360 }
361
362 #[test]
363 fn test_predicate_conversion_with_or_condition_unsupported() {
364 let sql = "foo > 1 or length(bar) = 1";
365 let predicate = convert_to_iceberg_predicate(sql);
366 assert_eq!(predicate, None);
367 }
368
369 #[test]
370 fn test_predicate_conversion_with_or_condition_supported() {
371 let sql = "foo > 1 or bar = 'test'";
372 let predicate = convert_to_iceberg_predicate(sql).unwrap();
373 let expected_predicate = Predicate::or(
374 Reference::new("foo").greater_than(Datum::long(1)),
375 Reference::new("bar").equal_to(Datum::string("test")),
376 );
377 assert_eq!(predicate, expected_predicate);
378 }
379
380 #[test]
381 fn test_predicate_conversion_with_complex_binary_expr() {
382 let sql = "(foo > 1 and bar = 'test') or foo < 0 ";
383 let predicate = convert_to_iceberg_predicate(sql).unwrap();
384
385 let inner_predicate = Predicate::and(
386 Reference::new("foo").greater_than(Datum::long(1)),
387 Reference::new("bar").equal_to(Datum::string("test")),
388 );
389 let expected_predicate = Predicate::or(
390 inner_predicate,
391 Reference::new("foo").less_than(Datum::long(0)),
392 );
393 assert_eq!(predicate, expected_predicate);
394 }
395
396 #[test]
397 fn test_predicate_conversion_with_one_and_expr_supported() {
398 let sql = "(foo > 1 and length(bar) = 1 ) or foo < 0 ";
399 let predicate = convert_to_iceberg_predicate(sql).unwrap();
400
401 let inner_predicate = Reference::new("foo").greater_than(Datum::long(1));
402 let expected_predicate = Predicate::or(
403 inner_predicate,
404 Reference::new("foo").less_than(Datum::long(0)),
405 );
406 assert_eq!(predicate, expected_predicate);
407 }
408
409 #[test]
410 fn test_predicate_conversion_with_complex_binary_expr_unsupported() {
411 let sql = "(foo > 1 or length(bar) = 1 ) and foo < 0 ";
412 let predicate = convert_to_iceberg_predicate(sql).unwrap();
413 let expected_predicate = Reference::new("foo").less_than(Datum::long(0));
414 assert_eq!(predicate, expected_predicate);
415 }
416
417 #[test]
418 fn test_predicate_conversion_with_cast() {
419 let sql = "ts >= timestamp '2023-01-05T00:00:00'";
420 let predicate = convert_to_iceberg_predicate(sql).unwrap();
421 let expected_predicate =
422 Reference::new("ts").greater_than_or_equal_to(Datum::string("2023-01-05T00:00:00"));
423 assert_eq!(predicate, expected_predicate);
424 }
425
426 #[test]
427 fn test_predicate_conversion_with_date_cast() {
428 let sql = "ts >= date '2023-01-05T11:00:00'";
429 let predicate = convert_to_iceberg_predicate(sql);
430 assert_eq!(predicate, None);
431 }
432}