1use std::sync::Arc;
19
20use futures::channel::mpsc::Sender;
21use futures::{SinkExt, TryFutureExt};
22
23use crate::delete_file_index::DeleteFileIndex;
24use crate::expr::{Bind, BoundPredicate, Predicate};
25use crate::io::object_cache::ObjectCache;
26use crate::scan::{
27 BoundPredicates, ExpressionEvaluatorCache, FileScanTask, ManifestEvaluatorCache,
28 PartitionFilterCache,
29};
30use crate::spec::{
31 ManifestContentType, ManifestEntryRef, ManifestFile, ManifestList, SchemaRef, SnapshotRef,
32 TableMetadataRef,
33};
34use crate::{Error, ErrorKind, Result};
35
36pub(crate) struct ManifestFileContext {
39 manifest_file: ManifestFile,
40
41 sender: Sender<ManifestEntryContext>,
42
43 field_ids: Arc<Vec<i32>>,
44 bound_predicates: Option<Arc<BoundPredicates>>,
45 object_cache: Arc<ObjectCache>,
46 snapshot_schema: SchemaRef,
47 expression_evaluator_cache: Arc<ExpressionEvaluatorCache>,
48 delete_file_index: DeleteFileIndex,
49 case_sensitive: bool,
50}
51
52pub(crate) struct ManifestEntryContext {
55 pub manifest_entry: ManifestEntryRef,
56
57 pub expression_evaluator_cache: Arc<ExpressionEvaluatorCache>,
58 pub field_ids: Arc<Vec<i32>>,
59 pub bound_predicates: Option<Arc<BoundPredicates>>,
60 pub partition_spec_id: i32,
61 pub snapshot_schema: SchemaRef,
62 pub delete_file_index: DeleteFileIndex,
63 pub case_sensitive: bool,
64}
65
66impl ManifestFileContext {
67 pub(crate) async fn fetch_manifest_and_stream_manifest_entries(self) -> Result<()> {
70 let ManifestFileContext {
71 object_cache,
72 manifest_file,
73 bound_predicates,
74 snapshot_schema,
75 field_ids,
76 mut sender,
77 expression_evaluator_cache,
78 delete_file_index,
79 ..
80 } = self;
81
82 let manifest = object_cache.get_manifest(&manifest_file).await?;
83
84 for manifest_entry in manifest.entries() {
85 let manifest_entry_context = ManifestEntryContext {
86 manifest_entry: manifest_entry.clone(),
88 expression_evaluator_cache: expression_evaluator_cache.clone(),
89 field_ids: field_ids.clone(),
90 partition_spec_id: manifest_file.partition_spec_id,
91 bound_predicates: bound_predicates.clone(),
92 snapshot_schema: snapshot_schema.clone(),
93 delete_file_index: delete_file_index.clone(),
94 case_sensitive: self.case_sensitive,
95 };
96
97 sender
98 .send(manifest_entry_context)
99 .map_err(|_| Error::new(ErrorKind::Unexpected, "mpsc channel SendError"))
100 .await?;
101 }
102
103 Ok(())
104 }
105}
106
107impl ManifestEntryContext {
108 pub(crate) async fn into_file_scan_task(self) -> Result<FileScanTask> {
111 let deletes = self
112 .delete_file_index
113 .get_deletes_for_data_file(
114 self.manifest_entry.data_file(),
115 self.manifest_entry.sequence_number(),
116 )
117 .await;
118
119 Ok(FileScanTask {
120 start: 0,
121 length: self.manifest_entry.file_size_in_bytes(),
122 record_count: Some(self.manifest_entry.record_count()),
123
124 data_file_path: self.manifest_entry.file_path().to_string(),
125 data_file_format: self.manifest_entry.file_format(),
126
127 schema: self.snapshot_schema,
128 project_field_ids: self.field_ids.to_vec(),
129 predicate: self
130 .bound_predicates
131 .map(|x| x.as_ref().snapshot_bound_predicate.clone()),
132
133 deletes,
134
135 partition: Some(self.manifest_entry.data_file.partition.clone()),
137 partition_spec: None,
139 name_mapping: None,
141 case_sensitive: self.case_sensitive,
142 })
143 }
144}
145
146#[derive(Debug)]
149pub(crate) struct PlanContext {
150 pub snapshot: SnapshotRef,
151
152 pub table_metadata: TableMetadataRef,
153 pub snapshot_schema: SchemaRef,
154 pub case_sensitive: bool,
155 pub predicate: Option<Arc<Predicate>>,
156 pub snapshot_bound_predicate: Option<Arc<BoundPredicate>>,
157 pub object_cache: Arc<ObjectCache>,
158 pub field_ids: Arc<Vec<i32>>,
159
160 pub partition_filter_cache: Arc<PartitionFilterCache>,
161 pub manifest_evaluator_cache: Arc<ManifestEvaluatorCache>,
162 pub expression_evaluator_cache: Arc<ExpressionEvaluatorCache>,
163}
164
165impl PlanContext {
166 pub(crate) async fn get_manifest_list(&self) -> Result<Arc<ManifestList>> {
167 self.object_cache
168 .as_ref()
169 .get_manifest_list(&self.snapshot, &self.table_metadata)
170 .await
171 }
172
173 fn get_partition_filter(&self, manifest_file: &ManifestFile) -> Result<Arc<BoundPredicate>> {
174 let partition_spec_id = manifest_file.partition_spec_id;
175
176 let partition_filter = self.partition_filter_cache.get(
177 partition_spec_id,
178 &self.table_metadata,
179 &self.snapshot_schema,
180 self.case_sensitive,
181 self.predicate
182 .as_ref()
183 .ok_or(Error::new(
184 ErrorKind::Unexpected,
185 "Expected a predicate but none present",
186 ))?
187 .as_ref()
188 .bind(self.snapshot_schema.clone(), self.case_sensitive)?,
189 )?;
190
191 Ok(partition_filter)
192 }
193
194 pub(crate) fn build_manifest_file_contexts(
195 &self,
196 manifest_list: Arc<ManifestList>,
197 tx_data: Sender<ManifestEntryContext>,
198 delete_file_idx: DeleteFileIndex,
199 delete_file_tx: Sender<ManifestEntryContext>,
200 ) -> Result<Box<impl Iterator<Item = Result<ManifestFileContext>> + 'static>> {
201 let mut manifest_files = manifest_list.entries().iter().collect::<Vec<_>>();
202 manifest_files.sort_by_key(|m| match m.content {
209 ManifestContentType::Deletes => 0,
210 ManifestContentType::Data => 1,
211 });
212
213 let mut filtered_mfcs = vec![];
215 for manifest_file in manifest_files {
216 let tx = if manifest_file.content == ManifestContentType::Deletes {
217 delete_file_tx.clone()
218 } else {
219 tx_data.clone()
220 };
221
222 let partition_bound_predicate = if self.predicate.is_some() {
223 let partition_bound_predicate = self.get_partition_filter(manifest_file)?;
224
225 if !self
228 .manifest_evaluator_cache
229 .get(
230 manifest_file.partition_spec_id,
231 partition_bound_predicate.clone(),
232 )
233 .eval(manifest_file)?
234 {
235 continue;
236 }
237
238 Some(partition_bound_predicate)
239 } else {
240 None
241 };
242
243 let mfc = self.create_manifest_file_context(
244 manifest_file,
245 partition_bound_predicate,
246 tx,
247 delete_file_idx.clone(),
248 );
249
250 filtered_mfcs.push(Ok(mfc));
251 }
252
253 Ok(Box::new(filtered_mfcs.into_iter()))
254 }
255
256 fn create_manifest_file_context(
257 &self,
258 manifest_file: &ManifestFile,
259 partition_filter: Option<Arc<BoundPredicate>>,
260 sender: Sender<ManifestEntryContext>,
261 delete_file_index: DeleteFileIndex,
262 ) -> ManifestFileContext {
263 let bound_predicates =
264 if let (Some(ref partition_bound_predicate), Some(snapshot_bound_predicate)) =
265 (partition_filter, &self.snapshot_bound_predicate)
266 {
267 Some(Arc::new(BoundPredicates {
268 partition_bound_predicate: partition_bound_predicate.as_ref().clone(),
269 snapshot_bound_predicate: snapshot_bound_predicate.as_ref().clone(),
270 }))
271 } else {
272 None
273 };
274
275 ManifestFileContext {
276 manifest_file: manifest_file.clone(),
277 bound_predicates,
278 sender,
279 object_cache: self.object_cache.clone(),
280 snapshot_schema: self.snapshot_schema.clone(),
281 field_ids: self.field_ids.clone(),
282 expression_evaluator_cache: self.expression_evaluator_cache.clone(),
283 delete_file_index,
284 case_sensitive: self.case_sensitive,
285 }
286 }
287}