iceberg/scan/
cache.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::collections::HashMap;
19use std::sync::{Arc, RwLock};
20
21use crate::expr::visitors::expression_evaluator::ExpressionEvaluator;
22use crate::expr::visitors::inclusive_projection::InclusiveProjection;
23use crate::expr::visitors::manifest_evaluator::ManifestEvaluator;
24use crate::expr::{Bind, BoundPredicate};
25use crate::spec::{Schema, TableMetadataRef};
26use crate::{Error, ErrorKind, Result};
27
28/// Manages the caching of [`BoundPredicate`] objects
29/// for [`PartitionSpec`]s based on partition spec id.
30#[derive(Debug)]
31pub(crate) struct PartitionFilterCache(RwLock<HashMap<i32, Arc<BoundPredicate>>>);
32
33impl PartitionFilterCache {
34    /// Creates a new [`PartitionFilterCache`]
35    /// with an empty internal HashMap.
36    pub(crate) fn new() -> Self {
37        Self(RwLock::new(HashMap::new()))
38    }
39
40    /// Retrieves a [`BoundPredicate`] from the cache
41    /// or computes it if not present.
42    pub(crate) fn get(
43        &self,
44        spec_id: i32,
45        table_metadata: &TableMetadataRef,
46        schema: &Schema,
47        case_sensitive: bool,
48        filter: BoundPredicate,
49    ) -> Result<Arc<BoundPredicate>> {
50        // we need a block here to ensure that the `read()` gets dropped before we hit the `write()`
51        // below, otherwise we hit deadlock
52        {
53            let read = self.0.read().map_err(|_| {
54                Error::new(
55                    ErrorKind::Unexpected,
56                    "PartitionFilterCache RwLock was poisoned",
57                )
58            })?;
59
60            if read.contains_key(&spec_id) {
61                return Ok(read.get(&spec_id).unwrap().clone());
62            }
63        }
64
65        let partition_spec = table_metadata
66            .partition_spec_by_id(spec_id)
67            .ok_or(Error::new(
68                ErrorKind::Unexpected,
69                format!("Could not find partition spec for id {spec_id}"),
70            ))?;
71
72        let partition_type = partition_spec.partition_type(schema)?;
73        let partition_fields = partition_type.fields().to_owned();
74        let partition_schema = Arc::new(
75            Schema::builder()
76                .with_schema_id(partition_spec.spec_id())
77                .with_fields(partition_fields)
78                .build()?,
79        );
80
81        let mut inclusive_projection = InclusiveProjection::new(partition_spec.clone());
82
83        let partition_filter = inclusive_projection
84            .project(&filter)?
85            .rewrite_not()
86            .bind(partition_schema.clone(), case_sensitive)?;
87
88        self.0
89            .write()
90            .map_err(|_| {
91                Error::new(
92                    ErrorKind::Unexpected,
93                    "PartitionFilterCache RwLock was poisoned",
94                )
95            })?
96            .insert(spec_id, Arc::new(partition_filter));
97
98        let read = self.0.read().map_err(|_| {
99            Error::new(
100                ErrorKind::Unexpected,
101                "PartitionFilterCache RwLock was poisoned",
102            )
103        })?;
104
105        Ok(read.get(&spec_id).unwrap().clone())
106    }
107}
108
109/// Manages the caching of [`ManifestEvaluator`] objects
110/// for [`PartitionSpec`]s based on partition spec id.
111#[derive(Debug)]
112pub(crate) struct ManifestEvaluatorCache(RwLock<HashMap<i32, Arc<ManifestEvaluator>>>);
113
114impl ManifestEvaluatorCache {
115    /// Creates a new [`ManifestEvaluatorCache`]
116    /// with an empty internal HashMap.
117    pub(crate) fn new() -> Self {
118        Self(RwLock::new(HashMap::new()))
119    }
120
121    /// Retrieves a [`ManifestEvaluator`] from the cache
122    /// or computes it if not present.
123    pub(crate) fn get(
124        &self,
125        spec_id: i32,
126        partition_filter: Arc<BoundPredicate>,
127    ) -> Arc<ManifestEvaluator> {
128        // we need a block here to ensure that the `read()` gets dropped before we hit the `write()`
129        // below, otherwise we hit deadlock
130        {
131            let read = self
132                .0
133                .read()
134                .map_err(|_| {
135                    Error::new(
136                        ErrorKind::Unexpected,
137                        "ManifestEvaluatorCache RwLock was poisoned",
138                    )
139                })
140                .unwrap();
141
142            if read.contains_key(&spec_id) {
143                return read.get(&spec_id).unwrap().clone();
144            }
145        }
146
147        self.0
148            .write()
149            .map_err(|_| {
150                Error::new(
151                    ErrorKind::Unexpected,
152                    "ManifestEvaluatorCache RwLock was poisoned",
153                )
154            })
155            .unwrap()
156            .insert(
157                spec_id,
158                Arc::new(ManifestEvaluator::builder(partition_filter.as_ref().clone()).build()),
159            );
160
161        let read = self
162            .0
163            .read()
164            .map_err(|_| {
165                Error::new(
166                    ErrorKind::Unexpected,
167                    "ManifestEvaluatorCache RwLock was poisoned",
168                )
169            })
170            .unwrap();
171
172        read.get(&spec_id).unwrap().clone()
173    }
174}
175
176/// Manages the caching of [`ExpressionEvaluator`] objects
177/// for [`PartitionSpec`]s based on partition spec id.
178#[derive(Debug)]
179pub(crate) struct ExpressionEvaluatorCache(RwLock<HashMap<i32, Arc<ExpressionEvaluator>>>);
180
181impl ExpressionEvaluatorCache {
182    /// Creates a new [`ExpressionEvaluatorCache`]
183    /// with an empty internal HashMap.
184    pub(crate) fn new() -> Self {
185        Self(RwLock::new(HashMap::new()))
186    }
187
188    /// Retrieves a [`ExpressionEvaluator`] from the cache
189    /// or computes it if not present.
190    pub(crate) fn get(
191        &self,
192        spec_id: i32,
193        partition_filter: &BoundPredicate,
194    ) -> Result<Arc<ExpressionEvaluator>> {
195        // we need a block here to ensure that the `read()` gets dropped before we hit the `write()`
196        // below, otherwise we hit deadlock
197        {
198            let read = self.0.read().map_err(|_| {
199                Error::new(
200                    ErrorKind::Unexpected,
201                    "PartitionFilterCache RwLock was poisoned",
202                )
203            })?;
204
205            if read.contains_key(&spec_id) {
206                return Ok(read.get(&spec_id).unwrap().clone());
207            }
208        }
209
210        self.0
211            .write()
212            .map_err(|_| {
213                Error::new(
214                    ErrorKind::Unexpected,
215                    "ManifestEvaluatorCache RwLock was poisoned",
216                )
217            })
218            .unwrap()
219            .insert(
220                spec_id,
221                Arc::new(ExpressionEvaluator::new(partition_filter.clone())),
222            );
223
224        let read = self
225            .0
226            .read()
227            .map_err(|_| {
228                Error::new(
229                    ErrorKind::Unexpected,
230                    "ManifestEvaluatorCache RwLock was poisoned",
231                )
232            })
233            .unwrap();
234
235        Ok(read.get(&spec_id).unwrap().clone())
236    }
237}