iceberg_datafusion/physical_plan/
scan.rs1use std::any::Any;
19use std::pin::Pin;
20use std::sync::Arc;
21use std::vec;
22
23use datafusion::arrow::array::RecordBatch;
24use datafusion::arrow::datatypes::SchemaRef as ArrowSchemaRef;
25use datafusion::error::Result as DFResult;
26use datafusion::execution::{SendableRecordBatchStream, TaskContext};
27use datafusion::physical_expr::EquivalenceProperties;
28use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
29use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
30use datafusion::physical_plan::{DisplayAs, ExecutionPlan, Partitioning, PlanProperties};
31use datafusion::prelude::Expr;
32use futures::{Stream, TryStreamExt};
33use iceberg::expr::Predicate;
34use iceberg::table::Table;
35
36use super::expr_to_predicate::convert_filters_to_predicate;
37use crate::to_datafusion_error;
38
39#[derive(Debug)]
42pub struct IcebergTableScan {
43 table: Table,
45 snapshot_id: Option<i64>,
47 plan_properties: PlanProperties,
50 projection: Option<Vec<String>>,
52 predicates: Option<Predicate>,
54 limit: Option<usize>,
56}
57
58impl IcebergTableScan {
59 pub(crate) fn new(
61 table: Table,
62 snapshot_id: Option<i64>,
63 schema: ArrowSchemaRef,
64 projection: Option<&Vec<usize>>,
65 filters: &[Expr],
66 limit: Option<usize>,
67 ) -> Self {
68 let output_schema = match projection {
69 None => schema.clone(),
70 Some(projection) => Arc::new(schema.project(projection).unwrap()),
71 };
72 let plan_properties = Self::compute_properties(output_schema.clone());
73 let projection = get_column_names(schema.clone(), projection);
74 let predicates = convert_filters_to_predicate(filters);
75
76 Self {
77 table,
78 snapshot_id,
79 plan_properties,
80 projection,
81 predicates,
82 limit,
83 }
84 }
85
86 pub fn table(&self) -> &Table {
87 &self.table
88 }
89
90 pub fn snapshot_id(&self) -> Option<i64> {
91 self.snapshot_id
92 }
93
94 pub fn projection(&self) -> Option<&[String]> {
95 self.projection.as_deref()
96 }
97
98 pub fn predicates(&self) -> Option<&Predicate> {
99 self.predicates.as_ref()
100 }
101
102 pub fn limit(&self) -> Option<usize> {
103 self.limit
104 }
105
106 fn compute_properties(schema: ArrowSchemaRef) -> PlanProperties {
108 PlanProperties::new(
112 EquivalenceProperties::new(schema),
113 Partitioning::UnknownPartitioning(1),
114 EmissionType::Incremental,
115 Boundedness::Bounded,
116 )
117 }
118}
119
120impl ExecutionPlan for IcebergTableScan {
121 fn name(&self) -> &str {
122 "IcebergTableScan"
123 }
124
125 fn as_any(&self) -> &dyn Any {
126 self
127 }
128
129 fn children(&self) -> Vec<&Arc<(dyn ExecutionPlan + 'static)>> {
130 vec![]
131 }
132
133 fn with_new_children(
134 self: Arc<Self>,
135 _children: Vec<Arc<dyn ExecutionPlan>>,
136 ) -> DFResult<Arc<dyn ExecutionPlan>> {
137 Ok(self)
138 }
139
140 fn properties(&self) -> &PlanProperties {
141 &self.plan_properties
142 }
143
144 fn execute(
145 &self,
146 _partition: usize,
147 _context: Arc<TaskContext>,
148 ) -> DFResult<SendableRecordBatchStream> {
149 let fut = get_batch_stream(
150 self.table.clone(),
151 self.snapshot_id,
152 self.projection.clone(),
153 self.predicates.clone(),
154 );
155 let stream = futures::stream::once(fut).try_flatten();
156
157 let limited_stream: Pin<Box<dyn Stream<Item = DFResult<RecordBatch>> + Send>> =
159 if let Some(limit) = self.limit {
160 let mut remaining = limit;
161 Box::pin(stream.try_filter_map(move |batch| {
162 futures::future::ready(if remaining == 0 {
163 Ok(None)
164 } else if batch.num_rows() <= remaining {
165 remaining -= batch.num_rows();
166 Ok(Some(batch))
167 } else {
168 let limited_batch = batch.slice(0, remaining);
169 remaining = 0;
170 Ok(Some(limited_batch))
171 })
172 }))
173 } else {
174 Box::pin(stream)
175 };
176
177 Ok(Box::pin(RecordBatchStreamAdapter::new(
178 self.schema(),
179 limited_stream,
180 )))
181 }
182}
183
184impl DisplayAs for IcebergTableScan {
185 fn fmt_as(
186 &self,
187 _t: datafusion::physical_plan::DisplayFormatType,
188 f: &mut std::fmt::Formatter,
189 ) -> std::fmt::Result {
190 write!(
191 f,
192 "IcebergTableScan projection:[{}] predicate:[{}]",
193 self.projection
194 .clone()
195 .map_or(String::new(), |v| v.join(",")),
196 self.predicates
197 .clone()
198 .map_or(String::from(""), |p| format!("{p}"))
199 )
200 }
201}
202
203async fn get_batch_stream(
209 table: Table,
210 snapshot_id: Option<i64>,
211 column_names: Option<Vec<String>>,
212 predicates: Option<Predicate>,
213) -> DFResult<Pin<Box<dyn Stream<Item = DFResult<RecordBatch>> + Send>>> {
214 let scan_builder = match snapshot_id {
215 Some(snapshot_id) => table.scan().snapshot_id(snapshot_id),
216 None => table.scan(),
217 };
218
219 let mut scan_builder = match column_names {
220 Some(column_names) => scan_builder.select(column_names),
221 None => scan_builder.select_all(),
222 };
223 if let Some(pred) = predicates {
224 scan_builder = scan_builder.with_filter(pred);
225 }
226 let table_scan = scan_builder.build().map_err(to_datafusion_error)?;
227
228 let stream = table_scan
229 .to_arrow()
230 .await
231 .map_err(to_datafusion_error)?
232 .map_err(to_datafusion_error);
233 Ok(Box::pin(stream))
234}
235
236fn get_column_names(
237 schema: ArrowSchemaRef,
238 projection: Option<&Vec<usize>>,
239) -> Option<Vec<String>> {
240 projection.map(|v| {
241 v.iter()
242 .map(|p| schema.field(*p).name().clone())
243 .collect::<Vec<String>>()
244 })
245}