iceberg/scan/
context.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::sync::Arc;
19
20use futures::channel::mpsc::Sender;
21use futures::{SinkExt, TryFutureExt};
22
23use crate::delete_file_index::DeleteFileIndex;
24use crate::expr::{Bind, BoundPredicate, Predicate};
25use crate::io::object_cache::ObjectCache;
26use crate::scan::{
27    BoundPredicates, ExpressionEvaluatorCache, FileScanTask, ManifestEvaluatorCache,
28    PartitionFilterCache,
29};
30use crate::spec::{
31    ManifestContentType, ManifestEntryRef, ManifestFile, ManifestList, SchemaRef, SnapshotRef,
32    TableMetadataRef,
33};
34use crate::{Error, ErrorKind, Result};
35
36/// Wraps a [`ManifestFile`] alongside the objects that are needed
37/// to process it in a thread-safe manner
38pub(crate) struct ManifestFileContext {
39    manifest_file: ManifestFile,
40
41    sender: Sender<ManifestEntryContext>,
42
43    field_ids: Arc<Vec<i32>>,
44    bound_predicates: Option<Arc<BoundPredicates>>,
45    object_cache: Arc<ObjectCache>,
46    snapshot_schema: SchemaRef,
47    expression_evaluator_cache: Arc<ExpressionEvaluatorCache>,
48    delete_file_index: DeleteFileIndex,
49    case_sensitive: bool,
50}
51
52/// Wraps a [`ManifestEntryRef`] alongside the objects that are needed
53/// to process it in a thread-safe manner
54pub(crate) struct ManifestEntryContext {
55    pub manifest_entry: ManifestEntryRef,
56
57    pub expression_evaluator_cache: Arc<ExpressionEvaluatorCache>,
58    pub field_ids: Arc<Vec<i32>>,
59    pub bound_predicates: Option<Arc<BoundPredicates>>,
60    pub partition_spec_id: i32,
61    pub snapshot_schema: SchemaRef,
62    pub delete_file_index: DeleteFileIndex,
63    pub case_sensitive: bool,
64}
65
66impl ManifestFileContext {
67    /// Consumes this [`ManifestFileContext`], fetching its Manifest from FileIO and then
68    /// streaming its constituent [`ManifestEntries`] to the channel provided in the context
69    pub(crate) async fn fetch_manifest_and_stream_manifest_entries(self) -> Result<()> {
70        let ManifestFileContext {
71            object_cache,
72            manifest_file,
73            bound_predicates,
74            snapshot_schema,
75            field_ids,
76            mut sender,
77            expression_evaluator_cache,
78            delete_file_index,
79            ..
80        } = self;
81
82        let manifest = object_cache.get_manifest(&manifest_file).await?;
83
84        for manifest_entry in manifest.entries() {
85            let manifest_entry_context = ManifestEntryContext {
86                // TODO: refactor to avoid the expensive ManifestEntry clone
87                manifest_entry: manifest_entry.clone(),
88                expression_evaluator_cache: expression_evaluator_cache.clone(),
89                field_ids: field_ids.clone(),
90                partition_spec_id: manifest_file.partition_spec_id,
91                bound_predicates: bound_predicates.clone(),
92                snapshot_schema: snapshot_schema.clone(),
93                delete_file_index: delete_file_index.clone(),
94                case_sensitive: self.case_sensitive,
95            };
96
97            sender
98                .send(manifest_entry_context)
99                .map_err(|_| Error::new(ErrorKind::Unexpected, "mpsc channel SendError"))
100                .await?;
101        }
102
103        Ok(())
104    }
105}
106
107impl ManifestEntryContext {
108    /// consume this `ManifestEntryContext`, returning a `FileScanTask`
109    /// created from it
110    pub(crate) async fn into_file_scan_task(self) -> Result<FileScanTask> {
111        let deletes = self
112            .delete_file_index
113            .get_deletes_for_data_file(
114                self.manifest_entry.data_file(),
115                self.manifest_entry.sequence_number(),
116            )
117            .await;
118
119        Ok(FileScanTask {
120            start: 0,
121            length: self.manifest_entry.file_size_in_bytes(),
122            record_count: Some(self.manifest_entry.record_count()),
123
124            data_file_path: self.manifest_entry.file_path().to_string(),
125            data_file_format: self.manifest_entry.file_format(),
126
127            schema: self.snapshot_schema,
128            project_field_ids: self.field_ids.to_vec(),
129            predicate: self
130                .bound_predicates
131                .map(|x| x.as_ref().snapshot_bound_predicate.clone()),
132
133            deletes,
134
135            // Include partition data and spec from manifest entry
136            partition: Some(self.manifest_entry.data_file.partition.clone()),
137            // TODO: Pass actual PartitionSpec through context chain for native flow
138            partition_spec: None,
139            // TODO: Extract name_mapping from table metadata property "schema.name-mapping.default"
140            name_mapping: None,
141            case_sensitive: self.case_sensitive,
142        })
143    }
144}
145
146/// PlanContext wraps a [`SnapshotRef`] alongside all the other
147/// objects that are required to perform a scan file plan.
148#[derive(Debug)]
149pub(crate) struct PlanContext {
150    pub snapshot: SnapshotRef,
151
152    pub table_metadata: TableMetadataRef,
153    pub snapshot_schema: SchemaRef,
154    pub case_sensitive: bool,
155    pub predicate: Option<Arc<Predicate>>,
156    pub snapshot_bound_predicate: Option<Arc<BoundPredicate>>,
157    pub object_cache: Arc<ObjectCache>,
158    pub field_ids: Arc<Vec<i32>>,
159
160    pub partition_filter_cache: Arc<PartitionFilterCache>,
161    pub manifest_evaluator_cache: Arc<ManifestEvaluatorCache>,
162    pub expression_evaluator_cache: Arc<ExpressionEvaluatorCache>,
163}
164
165impl PlanContext {
166    pub(crate) async fn get_manifest_list(&self) -> Result<Arc<ManifestList>> {
167        self.object_cache
168            .as_ref()
169            .get_manifest_list(&self.snapshot, &self.table_metadata)
170            .await
171    }
172
173    fn get_partition_filter(&self, manifest_file: &ManifestFile) -> Result<Arc<BoundPredicate>> {
174        let partition_spec_id = manifest_file.partition_spec_id;
175
176        let partition_filter = self.partition_filter_cache.get(
177            partition_spec_id,
178            &self.table_metadata,
179            &self.snapshot_schema,
180            self.case_sensitive,
181            self.predicate
182                .as_ref()
183                .ok_or(Error::new(
184                    ErrorKind::Unexpected,
185                    "Expected a predicate but none present",
186                ))?
187                .as_ref()
188                .bind(self.snapshot_schema.clone(), self.case_sensitive)?,
189        )?;
190
191        Ok(partition_filter)
192    }
193
194    pub(crate) fn build_manifest_file_contexts(
195        &self,
196        manifest_list: Arc<ManifestList>,
197        tx_data: Sender<ManifestEntryContext>,
198        delete_file_idx: DeleteFileIndex,
199        delete_file_tx: Sender<ManifestEntryContext>,
200    ) -> Result<Box<impl Iterator<Item = Result<ManifestFileContext>> + 'static>> {
201        let mut manifest_files = manifest_list.entries().iter().collect::<Vec<_>>();
202        // Sort manifest files to process delete manifests first.
203        // This avoids a deadlock where the producer blocks on sending data manifest entries
204        // (because the data channel is full) while the delete manifest consumer is waiting
205        // for delete manifest entries (which haven't been produced yet).
206        // By processing delete manifests first, we ensure the delete consumer can finish,
207        // which then allows the data consumer to start draining the data channel.
208        manifest_files.sort_by_key(|m| match m.content {
209            ManifestContentType::Deletes => 0,
210            ManifestContentType::Data => 1,
211        });
212
213        // TODO: Ideally we could ditch this intermediate Vec as we return an iterator.
214        let mut filtered_mfcs = vec![];
215        for manifest_file in manifest_files {
216            let tx = if manifest_file.content == ManifestContentType::Deletes {
217                delete_file_tx.clone()
218            } else {
219                tx_data.clone()
220            };
221
222            let partition_bound_predicate = if self.predicate.is_some() {
223                let partition_bound_predicate = self.get_partition_filter(manifest_file)?;
224
225                // evaluate the ManifestFile against the partition filter. Skip
226                // if it cannot contain any matching rows
227                if !self
228                    .manifest_evaluator_cache
229                    .get(
230                        manifest_file.partition_spec_id,
231                        partition_bound_predicate.clone(),
232                    )
233                    .eval(manifest_file)?
234                {
235                    continue;
236                }
237
238                Some(partition_bound_predicate)
239            } else {
240                None
241            };
242
243            let mfc = self.create_manifest_file_context(
244                manifest_file,
245                partition_bound_predicate,
246                tx,
247                delete_file_idx.clone(),
248            );
249
250            filtered_mfcs.push(Ok(mfc));
251        }
252
253        Ok(Box::new(filtered_mfcs.into_iter()))
254    }
255
256    fn create_manifest_file_context(
257        &self,
258        manifest_file: &ManifestFile,
259        partition_filter: Option<Arc<BoundPredicate>>,
260        sender: Sender<ManifestEntryContext>,
261        delete_file_index: DeleteFileIndex,
262    ) -> ManifestFileContext {
263        let bound_predicates =
264            if let (Some(ref partition_bound_predicate), Some(snapshot_bound_predicate)) =
265                (partition_filter, &self.snapshot_bound_predicate)
266            {
267                Some(Arc::new(BoundPredicates {
268                    partition_bound_predicate: partition_bound_predicate.as_ref().clone(),
269                    snapshot_bound_predicate: snapshot_bound_predicate.as_ref().clone(),
270                }))
271            } else {
272                None
273            };
274
275        ManifestFileContext {
276            manifest_file: manifest_file.clone(),
277            bound_predicates,
278            sender,
279            object_cache: self.object_cache.clone(),
280            snapshot_schema: self.snapshot_schema.clone(),
281            field_ids: self.field_ids.clone(),
282            expression_evaluator_cache: self.expression_evaluator_cache.clone(),
283            delete_file_index,
284            case_sensitive: self.case_sensitive,
285        }
286    }
287}