use std::any::Any;
use std::pin::Pin;
use std::sync::Arc;
use std::vec;
use datafusion::arrow::array::RecordBatch;
use datafusion::arrow::datatypes::SchemaRef as ArrowSchemaRef;
use datafusion::error::Result as DFResult;
use datafusion::execution::{SendableRecordBatchStream, TaskContext};
use datafusion::physical_expr::EquivalenceProperties;
use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
use datafusion::physical_plan::{DisplayAs, ExecutionPlan, Partitioning, PlanProperties};
use datafusion::prelude::Expr;
use futures::{Stream, TryStreamExt};
use iceberg::expr::Predicate;
use iceberg::table::Table;
use super::expr_to_predicate::convert_filters_to_predicate;
use crate::to_datafusion_error;
#[derive(Debug)]
pub(crate) struct IcebergTableScan {
table: Table,
snapshot_id: Option<i64>,
plan_properties: PlanProperties,
projection: Option<Vec<String>>,
predicates: Option<Predicate>,
}
impl IcebergTableScan {
pub(crate) fn new(
table: Table,
snapshot_id: Option<i64>,
schema: ArrowSchemaRef,
projection: Option<&Vec<usize>>,
filters: &[Expr],
) -> Self {
let output_schema = match projection {
None => schema.clone(),
Some(projection) => Arc::new(schema.project(projection).unwrap()),
};
let plan_properties = Self::compute_properties(output_schema.clone());
let projection = get_column_names(schema.clone(), projection);
let predicates = convert_filters_to_predicate(filters);
Self {
table,
snapshot_id,
plan_properties,
projection,
predicates,
}
}
fn compute_properties(schema: ArrowSchemaRef) -> PlanProperties {
PlanProperties::new(
EquivalenceProperties::new(schema),
Partitioning::UnknownPartitioning(1),
EmissionType::Incremental,
Boundedness::Bounded,
)
}
}
impl ExecutionPlan for IcebergTableScan {
fn name(&self) -> &str {
"IcebergTableScan"
}
fn as_any(&self) -> &dyn Any {
self
}
fn children(&self) -> Vec<&Arc<(dyn ExecutionPlan + 'static)>> {
vec![]
}
fn with_new_children(
self: Arc<Self>,
_children: Vec<Arc<dyn ExecutionPlan>>,
) -> DFResult<Arc<dyn ExecutionPlan>> {
Ok(self)
}
fn properties(&self) -> &PlanProperties {
&self.plan_properties
}
fn execute(
&self,
_partition: usize,
_context: Arc<TaskContext>,
) -> DFResult<SendableRecordBatchStream> {
let fut = get_batch_stream(
self.table.clone(),
self.snapshot_id,
self.projection.clone(),
self.predicates.clone(),
);
let stream = futures::stream::once(fut).try_flatten();
Ok(Box::pin(RecordBatchStreamAdapter::new(
self.schema(),
stream,
)))
}
}
impl DisplayAs for IcebergTableScan {
fn fmt_as(
&self,
_t: datafusion::physical_plan::DisplayFormatType,
f: &mut std::fmt::Formatter,
) -> std::fmt::Result {
write!(
f,
"IcebergTableScan projection:[{}] predicate:[{}]",
self.projection
.clone()
.map_or(String::new(), |v| v.join(",")),
self.predicates
.clone()
.map_or(String::from(""), |p| format!("{}", p))
)
}
}
async fn get_batch_stream(
table: Table,
snapshot_id: Option<i64>,
column_names: Option<Vec<String>>,
predicates: Option<Predicate>,
) -> DFResult<Pin<Box<dyn Stream<Item = DFResult<RecordBatch>> + Send>>> {
let scan_builder = match snapshot_id {
Some(snapshot_id) => table.scan().snapshot_id(snapshot_id),
None => table.scan(),
};
let mut scan_builder = match column_names {
Some(column_names) => scan_builder.select(column_names),
None => scan_builder.select_all(),
};
if let Some(pred) = predicates {
scan_builder = scan_builder.with_filter(pred);
}
let table_scan = scan_builder.build().map_err(to_datafusion_error)?;
let stream = table_scan
.to_arrow()
.await
.map_err(to_datafusion_error)?
.map_err(to_datafusion_error);
Ok(Box::pin(stream))
}
fn get_column_names(
schema: ArrowSchemaRef,
projection: Option<&Vec<usize>>,
) -> Option<Vec<String>> {
projection.map(|v| {
v.iter()
.map(|p| schema.field(*p).name().clone())
.collect::<Vec<String>>()
})
}