1use std::sync::Arc;
21
22use crate::arrow::ArrowReaderBuilder;
23use crate::encryption::EncryptionManager;
24use crate::encryption::kms::KeyManagementClient;
25use crate::inspect::MetadataTable;
26use crate::io::FileIO;
27use crate::io::object_cache::ObjectCache;
28use crate::runtime::Runtime;
29use crate::scan::TableScanBuilder;
30use crate::spec::{ManifestListReader, SchemaRef, SnapshotRef, TableMetadata, TableMetadataRef};
31use crate::{Error, ErrorKind, Result, TableIdent};
32
33pub struct TableBuilder {
35 file_io: Option<FileIO>,
36 metadata_location: Option<String>,
37 metadata: Option<TableMetadataRef>,
38 identifier: Option<TableIdent>,
39 kms_client: Option<Arc<dyn KeyManagementClient>>,
40 readonly: bool,
41 disable_cache: bool,
42 cache_size_bytes: Option<u64>,
43 runtime: Option<Runtime>,
44}
45
46impl TableBuilder {
47 pub(crate) fn new() -> Self {
48 Self {
49 file_io: None,
50 metadata_location: None,
51 metadata: None,
52 identifier: None,
53 kms_client: None,
54 readonly: false,
55 disable_cache: false,
56 cache_size_bytes: None,
57 runtime: None,
58 }
59 }
60
61 pub fn file_io(mut self, file_io: FileIO) -> Self {
63 self.file_io = Some(file_io);
64 self
65 }
66
67 pub fn metadata_location<T: Into<String>>(mut self, metadata_location: T) -> Self {
69 self.metadata_location = Some(metadata_location.into());
70 self
71 }
72
73 pub fn metadata<T: Into<TableMetadataRef>>(mut self, metadata: T) -> Self {
75 self.metadata = Some(metadata.into());
76 self
77 }
78
79 pub fn identifier(mut self, identifier: TableIdent) -> Self {
81 self.identifier = Some(identifier);
82 self
83 }
84
85 pub fn readonly(mut self, readonly: bool) -> Self {
87 self.readonly = readonly;
88 self
89 }
90
91 pub fn disable_cache(mut self) -> Self {
95 self.disable_cache = true;
96 self
97 }
98
99 pub fn cache_size_bytes(mut self, cache_size_bytes: u64) -> Self {
101 self.cache_size_bytes = Some(cache_size_bytes);
102 self
103 }
104
105 pub fn runtime(mut self, runtime: Runtime) -> Self {
107 self.runtime = Some(runtime);
108 self
109 }
110
111 pub fn kms_client(mut self, kms_client: Arc<dyn KeyManagementClient>) -> Self {
117 self.kms_client = Some(kms_client);
118 self
119 }
120
121 pub fn build(self) -> Result<Table> {
123 let Self {
124 file_io,
125 metadata_location,
126 metadata,
127 identifier,
128 kms_client,
129 readonly,
130 disable_cache,
131 cache_size_bytes,
132 runtime,
133 } = self;
134
135 let Some(file_io) = file_io else {
136 return Err(Error::new(
137 ErrorKind::DataInvalid,
138 "FileIO must be provided with TableBuilder.file_io()",
139 ));
140 };
141
142 let Some(metadata) = metadata else {
143 return Err(Error::new(
144 ErrorKind::DataInvalid,
145 "TableMetadataRef must be provided with TableBuilder.metadata()",
146 ));
147 };
148
149 let Some(identifier) = identifier else {
150 return Err(Error::new(
151 ErrorKind::DataInvalid,
152 "TableIdent must be provided with TableBuilder.identifier()",
153 ));
154 };
155
156 let Some(runtime) = runtime else {
157 return Err(Error::new(
158 ErrorKind::DataInvalid,
159 "Runtime must be provided with TableBuilder.runtime()",
160 ));
161 };
162
163 let encryption_manager =
164 EncryptionManager::from_table_metadata(kms_client.as_ref(), &metadata)?;
165
166 let object_cache = if disable_cache {
167 Arc::new(ObjectCache::with_disabled_cache(
168 file_io.clone(),
169 encryption_manager.clone(),
170 ))
171 } else if let Some(cache_size_bytes) = cache_size_bytes {
172 Arc::new(ObjectCache::new_with_capacity(
173 file_io.clone(),
174 cache_size_bytes,
175 encryption_manager.clone(),
176 ))
177 } else {
178 Arc::new(ObjectCache::new(
179 file_io.clone(),
180 encryption_manager.clone(),
181 ))
182 };
183
184 Ok(Table {
185 file_io,
186 metadata_location,
187 metadata,
188 identifier,
189 readonly,
190 object_cache,
191 runtime,
192 encryption_manager,
193 })
194 }
195}
196
197#[derive(Debug, Clone)]
199pub struct Table {
200 file_io: FileIO,
201 metadata_location: Option<String>,
202 metadata: TableMetadataRef,
203 identifier: TableIdent,
204 readonly: bool,
205 object_cache: Arc<ObjectCache>,
206 runtime: Runtime,
207 encryption_manager: Option<Arc<EncryptionManager>>,
208}
209
210impl Table {
211 pub(crate) fn with_metadata(mut self, metadata: TableMetadataRef) -> Self {
213 self.metadata = metadata;
214 self
215 }
216
217 pub(crate) fn with_metadata_location(mut self, metadata_location: String) -> Self {
219 self.metadata_location = Some(metadata_location);
220 self
221 }
222
223 pub fn builder() -> TableBuilder {
225 TableBuilder::new()
226 }
227
228 pub fn identifier(&self) -> &TableIdent {
230 &self.identifier
231 }
232 pub fn metadata(&self) -> &TableMetadata {
234 &self.metadata
235 }
236
237 pub fn metadata_ref(&self) -> TableMetadataRef {
239 self.metadata.clone()
240 }
241
242 pub fn metadata_location(&self) -> Option<&str> {
244 self.metadata_location.as_deref()
245 }
246
247 pub fn metadata_location_result(&self) -> Result<&str> {
249 self.metadata_location.as_deref().ok_or(Error::new(
250 ErrorKind::DataInvalid,
251 format!(
252 "Metadata location does not exist for table: {}",
253 self.identifier
254 ),
255 ))
256 }
257
258 pub fn file_io(&self) -> &FileIO {
260 &self.file_io
261 }
262
263 pub(crate) fn object_cache(&self) -> Arc<ObjectCache> {
265 self.object_cache.clone()
266 }
267
268 pub fn encryption_manager(&self) -> Option<&EncryptionManager> {
275 self.encryption_manager.as_deref()
276 }
277
278 pub fn scan(&self) -> TableScanBuilder<'_> {
280 TableScanBuilder::new(self)
281 }
282
283 pub fn inspect(&self) -> MetadataTable<'_> {
286 MetadataTable::new(self)
287 }
288
289 pub(crate) fn runtime(&self) -> &Runtime {
291 &self.runtime
292 }
293
294 pub fn readonly(&self) -> bool {
296 self.readonly
297 }
298
299 pub fn current_schema_ref(&self) -> SchemaRef {
301 self.metadata.current_schema().clone()
302 }
303
304 pub fn manifest_list_reader(&self, snapshot: &SnapshotRef) -> ManifestListReader {
306 ManifestListReader::new(
307 snapshot.clone(),
308 self.file_io.clone(),
309 self.metadata.clone(),
310 self.encryption_manager.clone(),
311 )
312 }
313
314 pub fn reader_builder(&self) -> ArrowReaderBuilder {
316 ArrowReaderBuilder::new(self.file_io.clone(), self.runtime().clone())
317 }
318}
319
320#[derive(Debug, Clone)]
344pub struct StaticTable(Table);
345
346impl StaticTable {
347 pub async fn from_metadata(
349 metadata: TableMetadata,
350 table_ident: TableIdent,
351 file_io: FileIO,
352 ) -> Result<Self> {
353 let table = Table::builder()
354 .metadata(metadata)
355 .identifier(table_ident)
356 .file_io(file_io.clone())
357 .runtime(Runtime::try_current()?)
358 .readonly(true)
359 .build();
360
361 Ok(Self(table?))
362 }
363 pub async fn from_metadata_file(
365 metadata_location: &str,
366 table_ident: TableIdent,
367 file_io: FileIO,
368 ) -> Result<Self> {
369 let metadata = TableMetadata::read_from(&file_io, metadata_location).await?;
370
371 let table = Table::builder()
372 .metadata(metadata)
373 .metadata_location(metadata_location)
374 .identifier(table_ident)
375 .file_io(file_io.clone())
376 .runtime(Runtime::try_current()?)
377 .readonly(true)
378 .build();
379
380 Ok(Self(table?))
381 }
382
383 pub fn scan(&self) -> TableScanBuilder<'_> {
385 self.0.scan()
386 }
387
388 pub fn metadata(&self) -> TableMetadataRef {
390 self.0.metadata_ref()
391 }
392
393 pub fn into_table(self) -> Table {
397 self.0
398 }
399
400 pub fn reader_builder(&self) -> ArrowReaderBuilder {
402 self.0.reader_builder()
403 }
404}
405
406#[cfg(test)]
407mod tests {
408 use std::fs;
409
410 use super::*;
411 use crate::encryption::SensitiveBytes;
412 use crate::encryption::kms::MemoryKeyManagementClient;
413 use crate::spec::TableProperties;
414
415 fn load_test_metadata(filename: &str) -> TableMetadata {
416 let path = format!(
417 "{}/testdata/table_metadata/{}",
418 env!("CARGO_MANIFEST_DIR"),
419 filename
420 );
421 let json = fs::read_to_string(path).unwrap();
422 serde_json::from_str(&json).unwrap()
423 }
424
425 #[tokio::test]
426 async fn test_static_table_from_file() {
427 let metadata_file_name = "TableMetadataV2Valid.json";
428 let metadata_file_path = format!(
429 "{}/testdata/table_metadata/{}",
430 env!("CARGO_MANIFEST_DIR"),
431 metadata_file_name
432 );
433 let file_io = FileIO::new_with_fs();
434 let static_identifier = TableIdent::from_strs(["static_ns", "static_table"]).unwrap();
435 let static_table =
436 StaticTable::from_metadata_file(&metadata_file_path, static_identifier, file_io)
437 .await
438 .unwrap();
439 let snapshot_id = static_table
440 .metadata()
441 .current_snapshot()
442 .unwrap()
443 .snapshot_id();
444 assert_eq!(
445 snapshot_id, 3055729675574597004,
446 "snapshot id from metadata don't match"
447 );
448 }
449
450 #[tokio::test]
451 async fn test_static_into_table() {
452 let metadata_file_name = "TableMetadataV2Valid.json";
453 let metadata_file_path = format!(
454 "{}/testdata/table_metadata/{}",
455 env!("CARGO_MANIFEST_DIR"),
456 metadata_file_name
457 );
458 let file_io = FileIO::new_with_fs();
459 let static_identifier = TableIdent::from_strs(["static_ns", "static_table"]).unwrap();
460 let static_table =
461 StaticTable::from_metadata_file(&metadata_file_path, static_identifier, file_io)
462 .await
463 .unwrap();
464 let table = static_table.into_table();
465 assert!(table.readonly());
466 assert_eq!(table.identifier.name(), "static_table");
467 assert_eq!(
468 table.metadata_location(),
469 Some(metadata_file_path).as_deref()
470 );
471 }
472
473 #[tokio::test]
474 async fn test_table_readonly_flag() {
475 let metadata_file_name = "TableMetadataV2Valid.json";
476 let metadata_file_path = format!(
477 "{}/testdata/table_metadata/{}",
478 env!("CARGO_MANIFEST_DIR"),
479 metadata_file_name
480 );
481 let file_io = FileIO::new_with_fs();
482 let metadata_file = file_io.new_input(metadata_file_path).unwrap();
483 let metadata_file_content = metadata_file.read().await.unwrap();
484 let table_metadata =
485 serde_json::from_slice::<TableMetadata>(&metadata_file_content).unwrap();
486 let static_identifier = TableIdent::from_strs(["ns", "table"]).unwrap();
487 let table = Table::builder()
488 .metadata(table_metadata)
489 .identifier(static_identifier)
490 .file_io(file_io.clone())
491 .runtime(Runtime::try_current().unwrap())
492 .build()
493 .unwrap();
494 assert!(!table.readonly());
495 assert_eq!(table.identifier.name(), "table");
496 }
497
498 fn make_kms() -> Arc<dyn KeyManagementClient> {
499 let kms = MemoryKeyManagementClient::new();
500 kms.add_master_key("master-1").unwrap();
501 Arc::new(kms)
502 }
503
504 #[tokio::test]
505 async fn table_decrypts_manifest_list_via_object_cache() {
506 let mut metadata: TableMetadata = load_test_metadata("TableMetadataV3ValidEncryption.json");
509
510 let manifest_list_path = format!(
512 "{}/testdata/manifests_lists/manifest-list-v3-encrypted.avro",
513 env!("CARGO_MANIFEST_DIR"),
514 );
515 let snapshot = metadata.snapshots.get_mut(&1).unwrap();
516 let mut patched = snapshot.as_ref().clone();
517 patched.manifest_list = manifest_list_path;
518 *snapshot = Arc::new(patched);
519
520 let kms: Arc<dyn KeyManagementClient> = {
522 let k = MemoryKeyManagementClient::new();
523 k.add_master_key_bytes(
524 "master-1",
525 SensitiveBytes::new([
526 0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0a, 0x0b, 0x0c,
527 0x0d, 0x0e, 0x0f,
528 ]),
529 )
530 .unwrap();
531 Arc::new(k)
532 };
533
534 let table = Table::builder()
535 .file_io(FileIO::new_with_fs())
536 .metadata(metadata)
537 .identifier(TableIdent::from_strs(["ns", "enc"]).unwrap())
538 .kms_client(kms)
539 .runtime(Runtime::try_current().unwrap())
540 .build()
541 .unwrap();
542
543 let snapshot_ref = table.metadata().current_snapshot().unwrap();
544 let manifest_list = table
545 .object_cache()
546 .get_manifest_list(snapshot_ref, &table.metadata_ref())
547 .await
548 .unwrap();
549 assert_eq!(manifest_list.entries().len(), 0);
550 }
551
552 #[tokio::test]
553 async fn table_builder_errors_when_encryption_key_id_set_but_no_kms() {
554 let metadata: TableMetadata = load_test_metadata("TableMetadataV3ValidEncryption.json");
555
556 let err = Table::builder()
557 .file_io(FileIO::new_with_memory())
558 .metadata(metadata)
559 .identifier(TableIdent::from_strs(["ns", "enc"]).unwrap())
560 .runtime(Runtime::try_current().unwrap())
561 .build()
562 .unwrap_err();
563 assert_eq!(err.kind(), ErrorKind::PreconditionFailed);
564 }
565
566 #[tokio::test]
567 async fn table_builder_skips_encryption_on_pre_v3_table() {
568 let mut metadata: TableMetadata = load_test_metadata("TableMetadataV2ValidMinimal.json");
571 metadata.properties.insert(
572 TableProperties::PROPERTY_ENCRYPTION_KEY_ID.to_string(),
573 "master-1".to_string(),
574 );
575
576 let table = Table::builder()
577 .file_io(FileIO::new_with_memory())
578 .metadata(metadata)
579 .identifier(TableIdent::from_strs(["ns", "enc"]).unwrap())
580 .kms_client(make_kms())
581 .runtime(Runtime::try_current().unwrap())
582 .build()
583 .unwrap();
584 assert!(table.encryption_manager().is_none());
585 }
586
587 #[tokio::test]
588 async fn table_builder_skips_encryption_when_property_absent() {
589 let metadata: TableMetadata = load_test_metadata("TableMetadataV3ValidMinimal.json");
590 let table = Table::builder()
591 .file_io(FileIO::new_with_memory())
592 .metadata(metadata)
593 .identifier(TableIdent::from_strs(["ns", "plain"]).unwrap())
594 .kms_client(make_kms())
595 .runtime(Runtime::try_current().unwrap())
596 .build()
597 .unwrap();
598 assert!(table.encryption_manager().is_none());
599 }
600}