iceberg_datafusion/physical_plan/
scan.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::any::Any;
19use std::pin::Pin;
20use std::sync::Arc;
21use std::vec;
22
23use datafusion::arrow::array::RecordBatch;
24use datafusion::arrow::datatypes::SchemaRef as ArrowSchemaRef;
25use datafusion::error::Result as DFResult;
26use datafusion::execution::{SendableRecordBatchStream, TaskContext};
27use datafusion::physical_expr::EquivalenceProperties;
28use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
29use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
30use datafusion::physical_plan::{DisplayAs, ExecutionPlan, Partitioning, PlanProperties};
31use datafusion::prelude::Expr;
32use futures::{Stream, TryStreamExt};
33use iceberg::expr::Predicate;
34use iceberg::table::Table;
35
36use super::expr_to_predicate::convert_filters_to_predicate;
37use crate::to_datafusion_error;
38
39/// Manages the scanning process of an Iceberg [`Table`], encapsulating the
40/// necessary details and computed properties required for execution planning.
41#[derive(Debug)]
42pub struct IcebergTableScan {
43    /// A table in the catalog.
44    table: Table,
45    /// Snapshot of the table to scan.
46    snapshot_id: Option<i64>,
47    /// Stores certain, often expensive to compute,
48    /// plan properties used in query optimization.
49    plan_properties: PlanProperties,
50    /// Projection column names, None means all columns
51    projection: Option<Vec<String>>,
52    /// Filters to apply to the table scan
53    predicates: Option<Predicate>,
54    /// Optional limit on the number of rows to return
55    limit: Option<usize>,
56}
57
58impl IcebergTableScan {
59    /// Creates a new [`IcebergTableScan`] object.
60    pub(crate) fn new(
61        table: Table,
62        snapshot_id: Option<i64>,
63        schema: ArrowSchemaRef,
64        projection: Option<&Vec<usize>>,
65        filters: &[Expr],
66        limit: Option<usize>,
67    ) -> Self {
68        let output_schema = match projection {
69            None => schema.clone(),
70            Some(projection) => Arc::new(schema.project(projection).unwrap()),
71        };
72        let plan_properties = Self::compute_properties(output_schema.clone());
73        let projection = get_column_names(schema.clone(), projection);
74        let predicates = convert_filters_to_predicate(filters);
75
76        Self {
77            table,
78            snapshot_id,
79            plan_properties,
80            projection,
81            predicates,
82            limit,
83        }
84    }
85
86    pub fn table(&self) -> &Table {
87        &self.table
88    }
89
90    pub fn snapshot_id(&self) -> Option<i64> {
91        self.snapshot_id
92    }
93
94    pub fn projection(&self) -> Option<&[String]> {
95        self.projection.as_deref()
96    }
97
98    pub fn predicates(&self) -> Option<&Predicate> {
99        self.predicates.as_ref()
100    }
101
102    pub fn limit(&self) -> Option<usize> {
103        self.limit
104    }
105
106    /// Computes [`PlanProperties`] used in query optimization.
107    fn compute_properties(schema: ArrowSchemaRef) -> PlanProperties {
108        // TODO:
109        // This is more or less a placeholder, to be replaced
110        // once we support output-partitioning
111        PlanProperties::new(
112            EquivalenceProperties::new(schema),
113            Partitioning::UnknownPartitioning(1),
114            EmissionType::Incremental,
115            Boundedness::Bounded,
116        )
117    }
118}
119
120impl ExecutionPlan for IcebergTableScan {
121    fn name(&self) -> &str {
122        "IcebergTableScan"
123    }
124
125    fn as_any(&self) -> &dyn Any {
126        self
127    }
128
129    fn children(&self) -> Vec<&Arc<(dyn ExecutionPlan + 'static)>> {
130        vec![]
131    }
132
133    fn with_new_children(
134        self: Arc<Self>,
135        _children: Vec<Arc<dyn ExecutionPlan>>,
136    ) -> DFResult<Arc<dyn ExecutionPlan>> {
137        Ok(self)
138    }
139
140    fn properties(&self) -> &PlanProperties {
141        &self.plan_properties
142    }
143
144    fn execute(
145        &self,
146        _partition: usize,
147        _context: Arc<TaskContext>,
148    ) -> DFResult<SendableRecordBatchStream> {
149        let fut = get_batch_stream(
150            self.table.clone(),
151            self.snapshot_id,
152            self.projection.clone(),
153            self.predicates.clone(),
154        );
155        let stream = futures::stream::once(fut).try_flatten();
156
157        // Apply limit if specified
158        let limited_stream: Pin<Box<dyn Stream<Item = DFResult<RecordBatch>> + Send>> =
159            if let Some(limit) = self.limit {
160                let mut remaining = limit;
161                Box::pin(stream.try_filter_map(move |batch| {
162                    futures::future::ready(if remaining == 0 {
163                        Ok(None)
164                    } else if batch.num_rows() <= remaining {
165                        remaining -= batch.num_rows();
166                        Ok(Some(batch))
167                    } else {
168                        let limited_batch = batch.slice(0, remaining);
169                        remaining = 0;
170                        Ok(Some(limited_batch))
171                    })
172                }))
173            } else {
174                Box::pin(stream)
175            };
176
177        Ok(Box::pin(RecordBatchStreamAdapter::new(
178            self.schema(),
179            limited_stream,
180        )))
181    }
182}
183
184impl DisplayAs for IcebergTableScan {
185    fn fmt_as(
186        &self,
187        _t: datafusion::physical_plan::DisplayFormatType,
188        f: &mut std::fmt::Formatter,
189    ) -> std::fmt::Result {
190        write!(
191            f,
192            "IcebergTableScan projection:[{}] predicate:[{}]",
193            self.projection
194                .clone()
195                .map_or(String::new(), |v| v.join(",")),
196            self.predicates
197                .clone()
198                .map_or(String::from(""), |p| format!("{p}"))
199        )
200    }
201}
202
203/// Asynchronously retrieves a stream of [`RecordBatch`] instances
204/// from a given table.
205///
206/// This function initializes a [`TableScan`], builds it,
207/// and then converts it into a stream of Arrow [`RecordBatch`]es.
208async fn get_batch_stream(
209    table: Table,
210    snapshot_id: Option<i64>,
211    column_names: Option<Vec<String>>,
212    predicates: Option<Predicate>,
213) -> DFResult<Pin<Box<dyn Stream<Item = DFResult<RecordBatch>> + Send>>> {
214    let scan_builder = match snapshot_id {
215        Some(snapshot_id) => table.scan().snapshot_id(snapshot_id),
216        None => table.scan(),
217    };
218
219    let mut scan_builder = match column_names {
220        Some(column_names) => scan_builder.select(column_names),
221        None => scan_builder.select_all(),
222    };
223    if let Some(pred) = predicates {
224        scan_builder = scan_builder.with_filter(pred);
225    }
226    let table_scan = scan_builder.build().map_err(to_datafusion_error)?;
227
228    let stream = table_scan
229        .to_arrow()
230        .await
231        .map_err(to_datafusion_error)?
232        .map_err(to_datafusion_error);
233    Ok(Box::pin(stream))
234}
235
236fn get_column_names(
237    schema: ArrowSchemaRef,
238    projection: Option<&Vec<usize>>,
239) -> Option<Vec<String>> {
240    projection.map(|v| {
241        v.iter()
242            .map(|p| schema.field(*p).name().clone())
243            .collect::<Vec<String>>()
244    })
245}