1use std::mem::size_of_val;
19use std::sync::Arc;
20
21use crate::io::FileIO;
22use crate::spec::{
23 FormatVersion, Manifest, ManifestFile, ManifestList, SchemaId, SnapshotRef, TableMetadataRef,
24};
25use crate::{Error, ErrorKind, Result};
26
27const DEFAULT_CACHE_SIZE_BYTES: u64 = 32 * 1024 * 1024; #[derive(Clone, Debug)]
30pub(crate) enum CachedItem {
31 ManifestList(Arc<ManifestList>),
32 Manifest(Arc<Manifest>),
33}
34
35#[derive(Clone, Debug, Hash, Eq, PartialEq)]
36pub(crate) enum CachedObjectKey {
37 ManifestList((String, FormatVersion, SchemaId)),
38 Manifest(String),
39}
40
41#[derive(Clone, Debug)]
43pub struct ObjectCache {
44 cache: moka::future::Cache<CachedObjectKey, CachedItem>,
45 file_io: FileIO,
46 cache_disabled: bool,
47}
48
49impl ObjectCache {
50 pub(crate) fn new(file_io: FileIO) -> Self {
53 Self::new_with_capacity(file_io, DEFAULT_CACHE_SIZE_BYTES)
54 }
55
56 pub(crate) fn new_with_capacity(file_io: FileIO, cache_size_bytes: u64) -> Self {
59 if cache_size_bytes == 0 {
60 Self::with_disabled_cache(file_io)
61 } else {
62 Self {
63 cache: moka::future::Cache::builder()
64 .weigher(|_, val: &CachedItem| match val {
65 CachedItem::ManifestList(item) => size_of_val(item.as_ref()),
66 CachedItem::Manifest(item) => size_of_val(item.as_ref()),
67 } as u32)
68 .max_capacity(cache_size_bytes)
69 .build(),
70 file_io,
71 cache_disabled: false,
72 }
73 }
74 }
75
76 pub(crate) fn with_disabled_cache(file_io: FileIO) -> Self {
79 Self {
80 cache: moka::future::Cache::new(0),
81 file_io,
82 cache_disabled: true,
83 }
84 }
85
86 pub(crate) async fn get_manifest(&self, manifest_file: &ManifestFile) -> Result<Arc<Manifest>> {
89 if self.cache_disabled {
90 return manifest_file
91 .load_manifest(&self.file_io)
92 .await
93 .map(Arc::new);
94 }
95
96 let key = CachedObjectKey::Manifest(manifest_file.manifest_path.clone());
97
98 let cache_entry = self
99 .cache
100 .entry_by_ref(&key)
101 .or_try_insert_with(self.fetch_and_parse_manifest(manifest_file))
102 .await
103 .map_err(|err| {
104 Error::new(
105 ErrorKind::Unexpected,
106 format!("Failed to load manifest {}", manifest_file.manifest_path),
107 )
108 .with_source(err)
109 })?
110 .into_value();
111
112 match cache_entry {
113 CachedItem::Manifest(arc_manifest) => Ok(arc_manifest),
114 _ => Err(Error::new(
115 ErrorKind::Unexpected,
116 format!("cached object for key '{key:?}' is not a Manifest"),
117 )),
118 }
119 }
120
121 pub(crate) async fn get_manifest_list(
124 &self,
125 snapshot: &SnapshotRef,
126 table_metadata: &TableMetadataRef,
127 ) -> Result<Arc<ManifestList>> {
128 if self.cache_disabled {
129 return snapshot
130 .load_manifest_list(&self.file_io, table_metadata)
131 .await
132 .map(Arc::new);
133 }
134
135 let key = CachedObjectKey::ManifestList((
136 snapshot.manifest_list().to_string(),
137 table_metadata.format_version,
138 snapshot.schema_id().unwrap(),
139 ));
140 let cache_entry = self
141 .cache
142 .entry_by_ref(&key)
143 .or_try_insert_with(self.fetch_and_parse_manifest_list(snapshot, table_metadata))
144 .await
145 .map_err(|err| {
146 Arc::try_unwrap(err).unwrap_or_else(|err| {
147 Error::new(
148 ErrorKind::Unexpected,
149 "Failed to load manifest list in cache",
150 )
151 .with_source(err)
152 })
153 })?
154 .into_value();
155
156 match cache_entry {
157 CachedItem::ManifestList(arc_manifest_list) => Ok(arc_manifest_list),
158 _ => Err(Error::new(
159 ErrorKind::Unexpected,
160 format!("cached object for path '{key:?}' is not a manifest list"),
161 )),
162 }
163 }
164
165 async fn fetch_and_parse_manifest(&self, manifest_file: &ManifestFile) -> Result<CachedItem> {
166 let manifest = manifest_file.load_manifest(&self.file_io).await?;
167
168 Ok(CachedItem::Manifest(Arc::new(manifest)))
169 }
170
171 async fn fetch_and_parse_manifest_list(
172 &self,
173 snapshot: &SnapshotRef,
174 table_metadata: &TableMetadataRef,
175 ) -> Result<CachedItem> {
176 let manifest_list = snapshot
177 .load_manifest_list(&self.file_io, table_metadata)
178 .await?;
179
180 Ok(CachedItem::ManifestList(Arc::new(manifest_list)))
181 }
182}
183
184#[cfg(test)]
185mod tests {
186 use std::fs;
187
188 use minijinja::value::Value;
189 use minijinja::{AutoEscape, Environment, context};
190 use tempfile::TempDir;
191 use uuid::Uuid;
192
193 use super::*;
194 use crate::TableIdent;
195 use crate::io::{FileIO, OutputFile};
196 use crate::spec::{
197 DataContentType, DataFileBuilder, DataFileFormat, Literal, ManifestEntry,
198 ManifestListWriter, ManifestStatus, ManifestWriterBuilder, Struct, TableMetadata,
199 };
200 use crate::table::Table;
201
202 fn render_template(template: &str, ctx: Value) -> String {
203 let mut env = Environment::new();
204 env.set_auto_escape_callback(|_| AutoEscape::None);
205 env.render_str(template, ctx).unwrap()
206 }
207
208 struct TableTestFixture {
209 table_location: String,
210 table: Table,
211 }
212
213 impl TableTestFixture {
214 fn new() -> Self {
215 let tmp_dir = TempDir::new().unwrap();
216 let table_location = tmp_dir.path().join("table1");
217 let manifest_list1_location = table_location.join("metadata/manifests_list_1.avro");
218 let manifest_list2_location = table_location.join("metadata/manifests_list_2.avro");
219 let table_metadata1_location = table_location.join("metadata/v1.json");
220
221 let file_io = FileIO::from_path(table_location.as_os_str().to_str().unwrap())
222 .unwrap()
223 .build()
224 .unwrap();
225
226 let table_metadata = {
227 let template_json_str = fs::read_to_string(format!(
228 "{}/testdata/example_table_metadata_v2.json",
229 env!("CARGO_MANIFEST_DIR")
230 ))
231 .unwrap();
232 let metadata_json = render_template(&template_json_str, context! {
233 table_location => &table_location,
234 manifest_list_1_location => &manifest_list1_location,
235 manifest_list_2_location => &manifest_list2_location,
236 table_metadata_1_location => &table_metadata1_location,
237 });
238 serde_json::from_str::<TableMetadata>(&metadata_json).unwrap()
239 };
240
241 let table = Table::builder()
242 .metadata(table_metadata)
243 .identifier(TableIdent::from_strs(["db", "table1"]).unwrap())
244 .file_io(file_io.clone())
245 .metadata_location(table_metadata1_location.as_os_str().to_str().unwrap())
246 .build()
247 .unwrap();
248
249 Self {
250 table_location: table_location.to_str().unwrap().to_string(),
251 table,
252 }
253 }
254
255 fn next_manifest_file(&self) -> OutputFile {
256 self.table
257 .file_io()
258 .new_output(format!(
259 "{}/metadata/manifest_{}.avro",
260 self.table_location,
261 Uuid::new_v4()
262 ))
263 .unwrap()
264 }
265
266 async fn setup_manifest_files(&mut self) {
267 let current_snapshot = self.table.metadata().current_snapshot().unwrap();
268 let current_schema = current_snapshot.schema(self.table.metadata()).unwrap();
269 let current_partition_spec = self.table.metadata().default_partition_spec();
270
271 let mut writer = ManifestWriterBuilder::new(
273 self.next_manifest_file(),
274 Some(current_snapshot.snapshot_id()),
275 None,
276 current_schema.clone(),
277 current_partition_spec.as_ref().clone(),
278 )
279 .build_v2_data();
280 writer
281 .add_entry(
282 ManifestEntry::builder()
283 .status(ManifestStatus::Added)
284 .data_file(
285 DataFileBuilder::default()
286 .partition_spec_id(0)
287 .content(DataContentType::Data)
288 .file_path(format!("{}/1.parquet", &self.table_location))
289 .file_format(DataFileFormat::Parquet)
290 .file_size_in_bytes(100)
291 .record_count(1)
292 .partition(Struct::from_iter([Some(Literal::long(100))]))
293 .build()
294 .unwrap(),
295 )
296 .build(),
297 )
298 .unwrap();
299 let data_file_manifest = writer.write_manifest_file().await.unwrap();
300
301 let mut manifest_list_write = ManifestListWriter::v2(
303 self.table
304 .file_io()
305 .new_output(current_snapshot.manifest_list())
306 .unwrap(),
307 current_snapshot.snapshot_id(),
308 current_snapshot.parent_snapshot_id(),
309 current_snapshot.sequence_number(),
310 );
311 manifest_list_write
312 .add_manifests(vec![data_file_manifest].into_iter())
313 .unwrap();
314 manifest_list_write.close().await.unwrap();
315 }
316 }
317
318 #[tokio::test]
319 async fn test_get_manifest_list_and_manifest_from_disabled_cache() {
320 let mut fixture = TableTestFixture::new();
321 fixture.setup_manifest_files().await;
322
323 let object_cache = ObjectCache::with_disabled_cache(fixture.table.file_io().clone());
324
325 let result_manifest_list = object_cache
326 .get_manifest_list(
327 fixture.table.metadata().current_snapshot().unwrap(),
328 &fixture.table.metadata_ref(),
329 )
330 .await
331 .unwrap();
332
333 assert_eq!(result_manifest_list.entries().len(), 1);
334
335 let manifest_file = result_manifest_list.entries().first().unwrap();
336 let result_manifest = object_cache.get_manifest(manifest_file).await.unwrap();
337
338 assert_eq!(
339 result_manifest
340 .entries()
341 .first()
342 .unwrap()
343 .file_path()
344 .split("/")
345 .last()
346 .unwrap(),
347 "1.parquet"
348 );
349 }
350
351 #[tokio::test]
352 async fn test_get_manifest_list_and_manifest_from_default_cache() {
353 let mut fixture = TableTestFixture::new();
354 fixture.setup_manifest_files().await;
355
356 let object_cache = ObjectCache::new(fixture.table.file_io().clone());
357
358 let result_manifest_list = object_cache
360 .get_manifest_list(
361 fixture.table.metadata().current_snapshot().unwrap(),
362 &fixture.table.metadata_ref(),
363 )
364 .await
365 .unwrap();
366
367 assert_eq!(result_manifest_list.entries().len(), 1);
368
369 let result_manifest_list = object_cache
371 .get_manifest_list(
372 fixture.table.metadata().current_snapshot().unwrap(),
373 &fixture.table.metadata_ref(),
374 )
375 .await
376 .unwrap();
377
378 assert_eq!(result_manifest_list.entries().len(), 1);
379
380 let manifest_file = result_manifest_list.entries().first().unwrap();
381
382 let result_manifest = object_cache.get_manifest(manifest_file).await.unwrap();
384
385 assert_eq!(
386 result_manifest
387 .entries()
388 .first()
389 .unwrap()
390 .file_path()
391 .split("/")
392 .last()
393 .unwrap(),
394 "1.parquet"
395 );
396
397 let result_manifest = object_cache.get_manifest(manifest_file).await.unwrap();
399
400 assert_eq!(
401 result_manifest
402 .entries()
403 .first()
404 .unwrap()
405 .file_path()
406 .split("/")
407 .last()
408 .unwrap(),
409 "1.parquet"
410 );
411 }
412}