use std::sync::Arc;
use futures::channel::mpsc::Sender;
use futures::{SinkExt, TryFutureExt};
use crate::delete_file_index::DeleteFileIndex;
use crate::expr::{Bind, BoundPredicate, Predicate};
use crate::io::object_cache::ObjectCache;
use crate::scan::{
BoundPredicates, ExpressionEvaluatorCache, FileScanTask, ManifestEvaluatorCache,
PartitionFilterCache,
};
use crate::spec::{
ManifestContentType, ManifestEntryRef, ManifestFile, ManifestList, SchemaRef, SnapshotRef,
TableMetadataRef,
};
use crate::{Error, ErrorKind, Result};
pub(crate) struct ManifestFileContext {
manifest_file: ManifestFile,
sender: Sender<ManifestEntryContext>,
field_ids: Arc<Vec<i32>>,
bound_predicates: Option<Arc<BoundPredicates>>,
object_cache: Arc<ObjectCache>,
snapshot_schema: SchemaRef,
expression_evaluator_cache: Arc<ExpressionEvaluatorCache>,
delete_file_index: Option<DeleteFileIndex>,
}
pub(crate) struct ManifestEntryContext {
pub manifest_entry: ManifestEntryRef,
pub expression_evaluator_cache: Arc<ExpressionEvaluatorCache>,
pub field_ids: Arc<Vec<i32>>,
pub bound_predicates: Option<Arc<BoundPredicates>>,
pub partition_spec_id: i32,
pub snapshot_schema: SchemaRef,
pub delete_file_index: Option<DeleteFileIndex>,
}
impl ManifestFileContext {
pub(crate) async fn fetch_manifest_and_stream_manifest_entries(self) -> Result<()> {
let ManifestFileContext {
object_cache,
manifest_file,
bound_predicates,
snapshot_schema,
field_ids,
mut sender,
expression_evaluator_cache,
delete_file_index,
..
} = self;
let manifest = object_cache.get_manifest(&manifest_file).await?;
for manifest_entry in manifest.entries() {
let manifest_entry_context = ManifestEntryContext {
manifest_entry: manifest_entry.clone(),
expression_evaluator_cache: expression_evaluator_cache.clone(),
field_ids: field_ids.clone(),
partition_spec_id: manifest_file.partition_spec_id,
bound_predicates: bound_predicates.clone(),
snapshot_schema: snapshot_schema.clone(),
delete_file_index: delete_file_index.clone(),
};
sender
.send(manifest_entry_context)
.map_err(|_| Error::new(ErrorKind::Unexpected, "mpsc channel SendError"))
.await?;
}
Ok(())
}
}
impl ManifestEntryContext {
pub(crate) async fn into_file_scan_task(self) -> Result<FileScanTask> {
let deletes = if let Some(delete_file_index) = self.delete_file_index {
delete_file_index
.get_deletes_for_data_file(
self.manifest_entry.data_file(),
self.manifest_entry.sequence_number(),
)
.await?
} else {
vec![]
};
Ok(FileScanTask {
start: 0,
length: self.manifest_entry.file_size_in_bytes(),
record_count: Some(self.manifest_entry.record_count()),
data_file_path: self.manifest_entry.file_path().to_string(),
data_file_content: self.manifest_entry.content_type(),
data_file_format: self.manifest_entry.file_format(),
schema: self.snapshot_schema,
project_field_ids: self.field_ids.to_vec(),
predicate: self
.bound_predicates
.map(|x| x.as_ref().snapshot_bound_predicate.clone()),
deletes,
})
}
}
#[derive(Debug)]
pub(crate) struct PlanContext {
pub snapshot: SnapshotRef,
pub table_metadata: TableMetadataRef,
pub snapshot_schema: SchemaRef,
pub case_sensitive: bool,
pub predicate: Option<Arc<Predicate>>,
pub snapshot_bound_predicate: Option<Arc<BoundPredicate>>,
pub object_cache: Arc<ObjectCache>,
pub field_ids: Arc<Vec<i32>>,
pub partition_filter_cache: Arc<PartitionFilterCache>,
pub manifest_evaluator_cache: Arc<ManifestEvaluatorCache>,
pub expression_evaluator_cache: Arc<ExpressionEvaluatorCache>,
}
impl PlanContext {
pub(crate) async fn get_manifest_list(&self) -> Result<Arc<ManifestList>> {
self.object_cache
.as_ref()
.get_manifest_list(&self.snapshot, &self.table_metadata)
.await
}
fn get_partition_filter(&self, manifest_file: &ManifestFile) -> Result<Arc<BoundPredicate>> {
let partition_spec_id = manifest_file.partition_spec_id;
let partition_filter = self.partition_filter_cache.get(
partition_spec_id,
&self.table_metadata,
&self.snapshot_schema,
self.case_sensitive,
self.predicate
.as_ref()
.ok_or(Error::new(
ErrorKind::Unexpected,
"Expected a predicate but none present",
))?
.as_ref()
.bind(self.snapshot_schema.clone(), self.case_sensitive)?,
)?;
Ok(partition_filter)
}
pub(crate) fn build_manifest_file_contexts(
&self,
manifest_list: Arc<ManifestList>,
tx_data: Sender<ManifestEntryContext>,
delete_file_idx_and_tx: Option<(DeleteFileIndex, Sender<ManifestEntryContext>)>,
) -> Result<Box<impl Iterator<Item = Result<ManifestFileContext>> + 'static>> {
let manifest_files = manifest_list.entries().iter();
let mut filtered_mfcs = vec![];
for manifest_file in manifest_files {
let (delete_file_idx, tx) = if manifest_file.content == ManifestContentType::Deletes {
let Some((delete_file_idx, tx)) = delete_file_idx_and_tx.as_ref() else {
continue;
};
(Some(delete_file_idx.clone()), tx.clone())
} else {
(
delete_file_idx_and_tx.as_ref().map(|x| x.0.clone()),
tx_data.clone(),
)
};
let partition_bound_predicate = if self.predicate.is_some() {
let partition_bound_predicate = self.get_partition_filter(manifest_file)?;
if !self
.manifest_evaluator_cache
.get(
manifest_file.partition_spec_id,
partition_bound_predicate.clone(),
)
.eval(manifest_file)?
{
continue;
}
Some(partition_bound_predicate)
} else {
None
};
let mfc = self.create_manifest_file_context(
manifest_file,
partition_bound_predicate,
tx,
delete_file_idx,
);
filtered_mfcs.push(Ok(mfc));
}
Ok(Box::new(filtered_mfcs.into_iter()))
}
fn create_manifest_file_context(
&self,
manifest_file: &ManifestFile,
partition_filter: Option<Arc<BoundPredicate>>,
sender: Sender<ManifestEntryContext>,
delete_file_index: Option<DeleteFileIndex>,
) -> ManifestFileContext {
let bound_predicates =
if let (Some(ref partition_bound_predicate), Some(snapshot_bound_predicate)) =
(partition_filter, &self.snapshot_bound_predicate)
{
Some(Arc::new(BoundPredicates {
partition_bound_predicate: partition_bound_predicate.as_ref().clone(),
snapshot_bound_predicate: snapshot_bound_predicate.as_ref().clone(),
}))
} else {
None
};
ManifestFileContext {
manifest_file: manifest_file.clone(),
bound_predicates,
sender,
object_cache: self.object_cache.clone(),
snapshot_schema: self.snapshot_schema.clone(),
field_ids: self.field_ids.clone(),
expression_evaluator_cache: self.expression_evaluator_cache.clone(),
delete_file_index,
}
}
}