iceberg/io/
object_cache.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::mem::size_of_val;
19use std::sync::Arc;
20
21use crate::io::FileIO;
22use crate::spec::{
23    FormatVersion, Manifest, ManifestFile, ManifestList, SchemaId, SnapshotRef, TableMetadataRef,
24};
25use crate::{Error, ErrorKind, Result};
26
27const DEFAULT_CACHE_SIZE_BYTES: u64 = 32 * 1024 * 1024; // 32MB
28
29#[derive(Clone, Debug)]
30pub(crate) enum CachedItem {
31    ManifestList(Arc<ManifestList>),
32    Manifest(Arc<Manifest>),
33}
34
35#[derive(Clone, Debug, Hash, Eq, PartialEq)]
36pub(crate) enum CachedObjectKey {
37    ManifestList((String, FormatVersion, SchemaId)),
38    Manifest(String),
39}
40
41/// Caches metadata objects deserialized from immutable files
42#[derive(Clone, Debug)]
43pub struct ObjectCache {
44    cache: moka::future::Cache<CachedObjectKey, CachedItem>,
45    file_io: FileIO,
46    cache_disabled: bool,
47}
48
49impl ObjectCache {
50    /// Creates a new [`ObjectCache`]
51    /// with the default cache size
52    pub(crate) fn new(file_io: FileIO) -> Self {
53        Self::new_with_capacity(file_io, DEFAULT_CACHE_SIZE_BYTES)
54    }
55
56    /// Creates a new [`ObjectCache`]
57    /// with a specific cache size
58    pub(crate) fn new_with_capacity(file_io: FileIO, cache_size_bytes: u64) -> Self {
59        if cache_size_bytes == 0 {
60            Self::with_disabled_cache(file_io)
61        } else {
62            Self {
63                cache: moka::future::Cache::builder()
64                    .weigher(|_, val: &CachedItem| match val {
65                        CachedItem::ManifestList(item) => size_of_val(item.as_ref()),
66                        CachedItem::Manifest(item) => size_of_val(item.as_ref()),
67                    } as u32)
68                    .max_capacity(cache_size_bytes)
69                    .build(),
70                file_io,
71                cache_disabled: false,
72            }
73        }
74    }
75
76    /// Creates a new [`ObjectCache`]
77    /// with caching disabled
78    pub(crate) fn with_disabled_cache(file_io: FileIO) -> Self {
79        Self {
80            cache: moka::future::Cache::new(0),
81            file_io,
82            cache_disabled: true,
83        }
84    }
85
86    /// Retrieves an Arc [`Manifest`] from the cache
87    /// or retrieves one from FileIO and parses it if not present
88    pub(crate) async fn get_manifest(&self, manifest_file: &ManifestFile) -> Result<Arc<Manifest>> {
89        if self.cache_disabled {
90            return manifest_file
91                .load_manifest(&self.file_io)
92                .await
93                .map(Arc::new);
94        }
95
96        let key = CachedObjectKey::Manifest(manifest_file.manifest_path.clone());
97
98        let cache_entry = self
99            .cache
100            .entry_by_ref(&key)
101            .or_try_insert_with(self.fetch_and_parse_manifest(manifest_file))
102            .await
103            .map_err(|err| {
104                Error::new(
105                    ErrorKind::Unexpected,
106                    format!("Failed to load manifest {}", manifest_file.manifest_path),
107                )
108                .with_source(err)
109            })?
110            .into_value();
111
112        match cache_entry {
113            CachedItem::Manifest(arc_manifest) => Ok(arc_manifest),
114            _ => Err(Error::new(
115                ErrorKind::Unexpected,
116                format!("cached object for key '{key:?}' is not a Manifest"),
117            )),
118        }
119    }
120
121    /// Retrieves an Arc [`ManifestList`] from the cache
122    /// or retrieves one from FileIO and parses it if not present
123    pub(crate) async fn get_manifest_list(
124        &self,
125        snapshot: &SnapshotRef,
126        table_metadata: &TableMetadataRef,
127    ) -> Result<Arc<ManifestList>> {
128        if self.cache_disabled {
129            return snapshot
130                .load_manifest_list(&self.file_io, table_metadata)
131                .await
132                .map(Arc::new);
133        }
134
135        let key = CachedObjectKey::ManifestList((
136            snapshot.manifest_list().to_string(),
137            table_metadata.format_version,
138            snapshot.schema_id().unwrap(),
139        ));
140        let cache_entry = self
141            .cache
142            .entry_by_ref(&key)
143            .or_try_insert_with(self.fetch_and_parse_manifest_list(snapshot, table_metadata))
144            .await
145            .map_err(|err| {
146                Arc::try_unwrap(err).unwrap_or_else(|err| {
147                    Error::new(
148                        ErrorKind::Unexpected,
149                        "Failed to load manifest list in cache",
150                    )
151                    .with_source(err)
152                })
153            })?
154            .into_value();
155
156        match cache_entry {
157            CachedItem::ManifestList(arc_manifest_list) => Ok(arc_manifest_list),
158            _ => Err(Error::new(
159                ErrorKind::Unexpected,
160                format!("cached object for path '{key:?}' is not a manifest list"),
161            )),
162        }
163    }
164
165    async fn fetch_and_parse_manifest(&self, manifest_file: &ManifestFile) -> Result<CachedItem> {
166        let manifest = manifest_file.load_manifest(&self.file_io).await?;
167
168        Ok(CachedItem::Manifest(Arc::new(manifest)))
169    }
170
171    async fn fetch_and_parse_manifest_list(
172        &self,
173        snapshot: &SnapshotRef,
174        table_metadata: &TableMetadataRef,
175    ) -> Result<CachedItem> {
176        let manifest_list = snapshot
177            .load_manifest_list(&self.file_io, table_metadata)
178            .await?;
179
180        Ok(CachedItem::ManifestList(Arc::new(manifest_list)))
181    }
182}
183
184#[cfg(test)]
185mod tests {
186    use std::fs;
187
188    use minijinja::value::Value;
189    use minijinja::{AutoEscape, Environment, context};
190    use tempfile::TempDir;
191    use uuid::Uuid;
192
193    use super::*;
194    use crate::TableIdent;
195    use crate::io::{FileIO, OutputFile};
196    use crate::spec::{
197        DataContentType, DataFileBuilder, DataFileFormat, Literal, ManifestEntry,
198        ManifestListWriter, ManifestStatus, ManifestWriterBuilder, Struct, TableMetadata,
199    };
200    use crate::table::Table;
201
202    fn render_template(template: &str, ctx: Value) -> String {
203        let mut env = Environment::new();
204        env.set_auto_escape_callback(|_| AutoEscape::None);
205        env.render_str(template, ctx).unwrap()
206    }
207
208    struct TableTestFixture {
209        table_location: String,
210        table: Table,
211    }
212
213    impl TableTestFixture {
214        fn new() -> Self {
215            let tmp_dir = TempDir::new().unwrap();
216            let table_location = tmp_dir.path().join("table1");
217            let manifest_list1_location = table_location.join("metadata/manifests_list_1.avro");
218            let manifest_list2_location = table_location.join("metadata/manifests_list_2.avro");
219            let table_metadata1_location = table_location.join("metadata/v1.json");
220
221            let file_io = FileIO::from_path(table_location.as_os_str().to_str().unwrap())
222                .unwrap()
223                .build()
224                .unwrap();
225
226            let table_metadata = {
227                let template_json_str = fs::read_to_string(format!(
228                    "{}/testdata/example_table_metadata_v2.json",
229                    env!("CARGO_MANIFEST_DIR")
230                ))
231                .unwrap();
232                let metadata_json = render_template(&template_json_str, context! {
233                    table_location => &table_location,
234                    manifest_list_1_location => &manifest_list1_location,
235                    manifest_list_2_location => &manifest_list2_location,
236                    table_metadata_1_location => &table_metadata1_location,
237                });
238                serde_json::from_str::<TableMetadata>(&metadata_json).unwrap()
239            };
240
241            let table = Table::builder()
242                .metadata(table_metadata)
243                .identifier(TableIdent::from_strs(["db", "table1"]).unwrap())
244                .file_io(file_io.clone())
245                .metadata_location(table_metadata1_location.as_os_str().to_str().unwrap())
246                .build()
247                .unwrap();
248
249            Self {
250                table_location: table_location.to_str().unwrap().to_string(),
251                table,
252            }
253        }
254
255        fn next_manifest_file(&self) -> OutputFile {
256            self.table
257                .file_io()
258                .new_output(format!(
259                    "{}/metadata/manifest_{}.avro",
260                    self.table_location,
261                    Uuid::new_v4()
262                ))
263                .unwrap()
264        }
265
266        async fn setup_manifest_files(&mut self) {
267            let current_snapshot = self.table.metadata().current_snapshot().unwrap();
268            let current_schema = current_snapshot.schema(self.table.metadata()).unwrap();
269            let current_partition_spec = self.table.metadata().default_partition_spec();
270
271            // Write data files
272            let mut writer = ManifestWriterBuilder::new(
273                self.next_manifest_file(),
274                Some(current_snapshot.snapshot_id()),
275                None,
276                current_schema.clone(),
277                current_partition_spec.as_ref().clone(),
278            )
279            .build_v2_data();
280            writer
281                .add_entry(
282                    ManifestEntry::builder()
283                        .status(ManifestStatus::Added)
284                        .data_file(
285                            DataFileBuilder::default()
286                                .partition_spec_id(0)
287                                .content(DataContentType::Data)
288                                .file_path(format!("{}/1.parquet", &self.table_location))
289                                .file_format(DataFileFormat::Parquet)
290                                .file_size_in_bytes(100)
291                                .record_count(1)
292                                .partition(Struct::from_iter([Some(Literal::long(100))]))
293                                .build()
294                                .unwrap(),
295                        )
296                        .build(),
297                )
298                .unwrap();
299            let data_file_manifest = writer.write_manifest_file().await.unwrap();
300
301            // Write to manifest list
302            let mut manifest_list_write = ManifestListWriter::v2(
303                self.table
304                    .file_io()
305                    .new_output(current_snapshot.manifest_list())
306                    .unwrap(),
307                current_snapshot.snapshot_id(),
308                current_snapshot.parent_snapshot_id(),
309                current_snapshot.sequence_number(),
310            );
311            manifest_list_write
312                .add_manifests(vec![data_file_manifest].into_iter())
313                .unwrap();
314            manifest_list_write.close().await.unwrap();
315        }
316    }
317
318    #[tokio::test]
319    async fn test_get_manifest_list_and_manifest_from_disabled_cache() {
320        let mut fixture = TableTestFixture::new();
321        fixture.setup_manifest_files().await;
322
323        let object_cache = ObjectCache::with_disabled_cache(fixture.table.file_io().clone());
324
325        let result_manifest_list = object_cache
326            .get_manifest_list(
327                fixture.table.metadata().current_snapshot().unwrap(),
328                &fixture.table.metadata_ref(),
329            )
330            .await
331            .unwrap();
332
333        assert_eq!(result_manifest_list.entries().len(), 1);
334
335        let manifest_file = result_manifest_list.entries().first().unwrap();
336        let result_manifest = object_cache.get_manifest(manifest_file).await.unwrap();
337
338        assert_eq!(
339            result_manifest
340                .entries()
341                .first()
342                .unwrap()
343                .file_path()
344                .split("/")
345                .last()
346                .unwrap(),
347            "1.parquet"
348        );
349    }
350
351    #[tokio::test]
352    async fn test_get_manifest_list_and_manifest_from_default_cache() {
353        let mut fixture = TableTestFixture::new();
354        fixture.setup_manifest_files().await;
355
356        let object_cache = ObjectCache::new(fixture.table.file_io().clone());
357
358        // not in cache
359        let result_manifest_list = object_cache
360            .get_manifest_list(
361                fixture.table.metadata().current_snapshot().unwrap(),
362                &fixture.table.metadata_ref(),
363            )
364            .await
365            .unwrap();
366
367        assert_eq!(result_manifest_list.entries().len(), 1);
368
369        // retrieve cached version
370        let result_manifest_list = object_cache
371            .get_manifest_list(
372                fixture.table.metadata().current_snapshot().unwrap(),
373                &fixture.table.metadata_ref(),
374            )
375            .await
376            .unwrap();
377
378        assert_eq!(result_manifest_list.entries().len(), 1);
379
380        let manifest_file = result_manifest_list.entries().first().unwrap();
381
382        // not in cache
383        let result_manifest = object_cache.get_manifest(manifest_file).await.unwrap();
384
385        assert_eq!(
386            result_manifest
387                .entries()
388                .first()
389                .unwrap()
390                .file_path()
391                .split("/")
392                .last()
393                .unwrap(),
394            "1.parquet"
395        );
396
397        // retrieve cached version
398        let result_manifest = object_cache.get_manifest(manifest_file).await.unwrap();
399
400        assert_eq!(
401            result_manifest
402                .entries()
403                .first()
404                .unwrap()
405                .file_path()
406                .split("/")
407                .last()
408                .unwrap(),
409            "1.parquet"
410        );
411    }
412}