Skip to main content

iceberg/
table.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Table API for Apache Iceberg
19
20use std::sync::Arc;
21
22use crate::arrow::ArrowReaderBuilder;
23use crate::encryption::EncryptionManager;
24use crate::encryption::kms::KeyManagementClient;
25use crate::inspect::MetadataTable;
26use crate::io::FileIO;
27use crate::io::object_cache::ObjectCache;
28use crate::runtime::Runtime;
29use crate::scan::TableScanBuilder;
30use crate::spec::{ManifestListReader, SchemaRef, SnapshotRef, TableMetadata, TableMetadataRef};
31use crate::{Error, ErrorKind, Result, TableIdent};
32
33/// Builder to create table scan.
34pub struct TableBuilder {
35    file_io: Option<FileIO>,
36    metadata_location: Option<String>,
37    metadata: Option<TableMetadataRef>,
38    identifier: Option<TableIdent>,
39    kms_client: Option<Arc<dyn KeyManagementClient>>,
40    readonly: bool,
41    disable_cache: bool,
42    cache_size_bytes: Option<u64>,
43    runtime: Option<Runtime>,
44}
45
46impl TableBuilder {
47    pub(crate) fn new() -> Self {
48        Self {
49            file_io: None,
50            metadata_location: None,
51            metadata: None,
52            identifier: None,
53            kms_client: None,
54            readonly: false,
55            disable_cache: false,
56            cache_size_bytes: None,
57            runtime: None,
58        }
59    }
60
61    /// required - sets the necessary FileIO to use for the table
62    pub fn file_io(mut self, file_io: FileIO) -> Self {
63        self.file_io = Some(file_io);
64        self
65    }
66
67    /// optional - sets the tables metadata location
68    pub fn metadata_location<T: Into<String>>(mut self, metadata_location: T) -> Self {
69        self.metadata_location = Some(metadata_location.into());
70        self
71    }
72
73    /// required - passes in the TableMetadata to use for the Table
74    pub fn metadata<T: Into<TableMetadataRef>>(mut self, metadata: T) -> Self {
75        self.metadata = Some(metadata.into());
76        self
77    }
78
79    /// required - passes in the TableIdent to use for the Table
80    pub fn identifier(mut self, identifier: TableIdent) -> Self {
81        self.identifier = Some(identifier);
82        self
83    }
84
85    /// specifies if the Table is readonly or not (default not)
86    pub fn readonly(mut self, readonly: bool) -> Self {
87        self.readonly = readonly;
88        self
89    }
90
91    /// specifies if the Table's metadata cache will be disabled,
92    /// so that reads of Manifests and ManifestLists will never
93    /// get cached.
94    pub fn disable_cache(mut self) -> Self {
95        self.disable_cache = true;
96        self
97    }
98
99    /// optionally set a non-default metadata cache size
100    pub fn cache_size_bytes(mut self, cache_size_bytes: u64) -> Self {
101        self.cache_size_bytes = Some(cache_size_bytes);
102        self
103    }
104
105    /// Set the Runtime for this table to use when spawning tasks.
106    pub fn runtime(mut self, runtime: Runtime) -> Self {
107        self.runtime = Some(runtime);
108        self
109    }
110
111    /// optional - sets the KMS client used to unwrap keys for table encryption.
112    ///
113    /// If the table metadata has the `encryption.key-id` property set, a
114    /// [`KeyManagementClient`] must be provided here so the table can build
115    /// an [`EncryptionManager`]; otherwise [`Self::build`] will return an error.
116    pub fn kms_client(mut self, kms_client: Arc<dyn KeyManagementClient>) -> Self {
117        self.kms_client = Some(kms_client);
118        self
119    }
120
121    /// build the Table
122    pub fn build(self) -> Result<Table> {
123        let Self {
124            file_io,
125            metadata_location,
126            metadata,
127            identifier,
128            kms_client,
129            readonly,
130            disable_cache,
131            cache_size_bytes,
132            runtime,
133        } = self;
134
135        let Some(file_io) = file_io else {
136            return Err(Error::new(
137                ErrorKind::DataInvalid,
138                "FileIO must be provided with TableBuilder.file_io()",
139            ));
140        };
141
142        let Some(metadata) = metadata else {
143            return Err(Error::new(
144                ErrorKind::DataInvalid,
145                "TableMetadataRef must be provided with TableBuilder.metadata()",
146            ));
147        };
148
149        let Some(identifier) = identifier else {
150            return Err(Error::new(
151                ErrorKind::DataInvalid,
152                "TableIdent must be provided with TableBuilder.identifier()",
153            ));
154        };
155
156        let Some(runtime) = runtime else {
157            return Err(Error::new(
158                ErrorKind::DataInvalid,
159                "Runtime must be provided with TableBuilder.runtime()",
160            ));
161        };
162
163        let encryption_manager =
164            EncryptionManager::from_table_metadata(kms_client.as_ref(), &metadata)?;
165
166        let object_cache = if disable_cache {
167            Arc::new(ObjectCache::with_disabled_cache(
168                file_io.clone(),
169                encryption_manager.clone(),
170            ))
171        } else if let Some(cache_size_bytes) = cache_size_bytes {
172            Arc::new(ObjectCache::new_with_capacity(
173                file_io.clone(),
174                cache_size_bytes,
175                encryption_manager.clone(),
176            ))
177        } else {
178            Arc::new(ObjectCache::new(
179                file_io.clone(),
180                encryption_manager.clone(),
181            ))
182        };
183
184        Ok(Table {
185            file_io,
186            metadata_location,
187            metadata,
188            identifier,
189            readonly,
190            object_cache,
191            runtime,
192            encryption_manager,
193        })
194    }
195}
196
197/// Table represents a table in the catalog.
198#[derive(Debug, Clone)]
199pub struct Table {
200    file_io: FileIO,
201    metadata_location: Option<String>,
202    metadata: TableMetadataRef,
203    identifier: TableIdent,
204    readonly: bool,
205    object_cache: Arc<ObjectCache>,
206    runtime: Runtime,
207    encryption_manager: Option<Arc<EncryptionManager>>,
208}
209
210impl Table {
211    /// Sets the [`Table`] metadata and returns an updated instance with the new metadata applied.
212    pub(crate) fn with_metadata(mut self, metadata: TableMetadataRef) -> Self {
213        self.metadata = metadata;
214        self
215    }
216
217    /// Sets the [`Table`] metadata location and returns an updated instance.
218    pub(crate) fn with_metadata_location(mut self, metadata_location: String) -> Self {
219        self.metadata_location = Some(metadata_location);
220        self
221    }
222
223    /// Returns a TableBuilder to build a table
224    pub fn builder() -> TableBuilder {
225        TableBuilder::new()
226    }
227
228    /// Returns table identifier.
229    pub fn identifier(&self) -> &TableIdent {
230        &self.identifier
231    }
232    /// Returns current metadata.
233    pub fn metadata(&self) -> &TableMetadata {
234        &self.metadata
235    }
236
237    /// Returns current metadata ref.
238    pub fn metadata_ref(&self) -> TableMetadataRef {
239        self.metadata.clone()
240    }
241
242    /// Returns current metadata location.
243    pub fn metadata_location(&self) -> Option<&str> {
244        self.metadata_location.as_deref()
245    }
246
247    /// Returns current metadata location in a result.
248    pub fn metadata_location_result(&self) -> Result<&str> {
249        self.metadata_location.as_deref().ok_or(Error::new(
250            ErrorKind::DataInvalid,
251            format!(
252                "Metadata location does not exist for table: {}",
253                self.identifier
254            ),
255        ))
256    }
257
258    /// Returns file io used in this table.
259    pub fn file_io(&self) -> &FileIO {
260        &self.file_io
261    }
262
263    /// Returns this table's object cache
264    pub(crate) fn object_cache(&self) -> Arc<ObjectCache> {
265        self.object_cache.clone()
266    }
267
268    /// Returns the [`EncryptionManager`] for this table, if encryption is
269    /// configured.
270    ///
271    /// A manager is present iff the table metadata has the
272    /// `encryption.key-id` property set and a [`KeyManagementClient`] was
273    /// supplied to the [`TableBuilder`].
274    pub fn encryption_manager(&self) -> Option<&EncryptionManager> {
275        self.encryption_manager.as_deref()
276    }
277
278    /// Creates a table scan.
279    pub fn scan(&self) -> TableScanBuilder<'_> {
280        TableScanBuilder::new(self)
281    }
282
283    /// Creates a metadata table which provides table-like APIs for inspecting metadata.
284    /// See [`MetadataTable`] for more details.
285    pub fn inspect(&self) -> MetadataTable<'_> {
286        MetadataTable::new(self)
287    }
288
289    /// Returns the [`Runtime`] for this table.
290    pub(crate) fn runtime(&self) -> &Runtime {
291        &self.runtime
292    }
293
294    /// Returns the flag indicating whether the `Table` is readonly or not
295    pub fn readonly(&self) -> bool {
296        self.readonly
297    }
298
299    /// Returns the current schema as a shared reference.
300    pub fn current_schema_ref(&self) -> SchemaRef {
301        self.metadata.current_schema().clone()
302    }
303
304    /// Creates a [`ManifestListReader`] for the given snapshot.
305    pub fn manifest_list_reader(&self, snapshot: &SnapshotRef) -> ManifestListReader {
306        ManifestListReader::new(
307            snapshot.clone(),
308            self.file_io.clone(),
309            self.metadata.clone(),
310            self.encryption_manager.clone(),
311        )
312    }
313
314    /// Create a reader for the table.
315    pub fn reader_builder(&self) -> ArrowReaderBuilder {
316        ArrowReaderBuilder::new(self.file_io.clone(), self.runtime().clone())
317    }
318}
319
320/// `StaticTable` is a read-only table struct that can be created from a metadata file or from `TableMetaData` without a catalog.
321/// It can only be used to read metadata and for table scan.
322/// # Examples
323///
324/// ```rust, no_run
325/// # use iceberg::io::FileIO;
326/// # use iceberg::table::StaticTable;
327/// # use iceberg::TableIdent;
328/// # async fn example() {
329/// let metadata_file_location = "s3://bucket_name/path/to/metadata.json";
330/// let file_io = FileIO::new_with_fs();
331/// let static_identifier = TableIdent::from_strs(["static_ns", "static_table"]).unwrap();
332/// let static_table =
333///     StaticTable::from_metadata_file(&metadata_file_location, static_identifier, file_io)
334///         .await
335///         .unwrap();
336/// let snapshot_id = static_table
337///     .metadata()
338///     .current_snapshot()
339///     .unwrap()
340///     .snapshot_id();
341/// # }
342/// ```
343#[derive(Debug, Clone)]
344pub struct StaticTable(Table);
345
346impl StaticTable {
347    /// Creates a static table from a given `TableMetadata` and `FileIO`
348    pub async fn from_metadata(
349        metadata: TableMetadata,
350        table_ident: TableIdent,
351        file_io: FileIO,
352    ) -> Result<Self> {
353        let table = Table::builder()
354            .metadata(metadata)
355            .identifier(table_ident)
356            .file_io(file_io.clone())
357            .runtime(Runtime::try_current()?)
358            .readonly(true)
359            .build();
360
361        Ok(Self(table?))
362    }
363    /// Creates a static table directly from metadata file and `FileIO`
364    pub async fn from_metadata_file(
365        metadata_location: &str,
366        table_ident: TableIdent,
367        file_io: FileIO,
368    ) -> Result<Self> {
369        let metadata = TableMetadata::read_from(&file_io, metadata_location).await?;
370
371        let table = Table::builder()
372            .metadata(metadata)
373            .metadata_location(metadata_location)
374            .identifier(table_ident)
375            .file_io(file_io.clone())
376            .runtime(Runtime::try_current()?)
377            .readonly(true)
378            .build();
379
380        Ok(Self(table?))
381    }
382
383    /// Create a TableScanBuilder for the static table.
384    pub fn scan(&self) -> TableScanBuilder<'_> {
385        self.0.scan()
386    }
387
388    /// Get TableMetadataRef for the static table
389    pub fn metadata(&self) -> TableMetadataRef {
390        self.0.metadata_ref()
391    }
392
393    /// Consumes the `StaticTable` and return it as a `Table`
394    /// Please use this method carefully as the Table it returns remains detached from a catalog
395    /// and can't be used to perform modifications on the table.
396    pub fn into_table(self) -> Table {
397        self.0
398    }
399
400    /// Create a reader for the table.
401    pub fn reader_builder(&self) -> ArrowReaderBuilder {
402        self.0.reader_builder()
403    }
404}
405
406#[cfg(test)]
407mod tests {
408    use std::fs;
409
410    use super::*;
411    use crate::encryption::SensitiveBytes;
412    use crate::encryption::kms::MemoryKeyManagementClient;
413    use crate::spec::TableProperties;
414
415    fn load_test_metadata(filename: &str) -> TableMetadata {
416        let path = format!(
417            "{}/testdata/table_metadata/{}",
418            env!("CARGO_MANIFEST_DIR"),
419            filename
420        );
421        let json = fs::read_to_string(path).unwrap();
422        serde_json::from_str(&json).unwrap()
423    }
424
425    #[tokio::test]
426    async fn test_static_table_from_file() {
427        let metadata_file_name = "TableMetadataV2Valid.json";
428        let metadata_file_path = format!(
429            "{}/testdata/table_metadata/{}",
430            env!("CARGO_MANIFEST_DIR"),
431            metadata_file_name
432        );
433        let file_io = FileIO::new_with_fs();
434        let static_identifier = TableIdent::from_strs(["static_ns", "static_table"]).unwrap();
435        let static_table =
436            StaticTable::from_metadata_file(&metadata_file_path, static_identifier, file_io)
437                .await
438                .unwrap();
439        let snapshot_id = static_table
440            .metadata()
441            .current_snapshot()
442            .unwrap()
443            .snapshot_id();
444        assert_eq!(
445            snapshot_id, 3055729675574597004,
446            "snapshot id from metadata don't match"
447        );
448    }
449
450    #[tokio::test]
451    async fn test_static_into_table() {
452        let metadata_file_name = "TableMetadataV2Valid.json";
453        let metadata_file_path = format!(
454            "{}/testdata/table_metadata/{}",
455            env!("CARGO_MANIFEST_DIR"),
456            metadata_file_name
457        );
458        let file_io = FileIO::new_with_fs();
459        let static_identifier = TableIdent::from_strs(["static_ns", "static_table"]).unwrap();
460        let static_table =
461            StaticTable::from_metadata_file(&metadata_file_path, static_identifier, file_io)
462                .await
463                .unwrap();
464        let table = static_table.into_table();
465        assert!(table.readonly());
466        assert_eq!(table.identifier.name(), "static_table");
467        assert_eq!(
468            table.metadata_location(),
469            Some(metadata_file_path).as_deref()
470        );
471    }
472
473    #[tokio::test]
474    async fn test_table_readonly_flag() {
475        let metadata_file_name = "TableMetadataV2Valid.json";
476        let metadata_file_path = format!(
477            "{}/testdata/table_metadata/{}",
478            env!("CARGO_MANIFEST_DIR"),
479            metadata_file_name
480        );
481        let file_io = FileIO::new_with_fs();
482        let metadata_file = file_io.new_input(metadata_file_path).unwrap();
483        let metadata_file_content = metadata_file.read().await.unwrap();
484        let table_metadata =
485            serde_json::from_slice::<TableMetadata>(&metadata_file_content).unwrap();
486        let static_identifier = TableIdent::from_strs(["ns", "table"]).unwrap();
487        let table = Table::builder()
488            .metadata(table_metadata)
489            .identifier(static_identifier)
490            .file_io(file_io.clone())
491            .runtime(Runtime::try_current().unwrap())
492            .build()
493            .unwrap();
494        assert!(!table.readonly());
495        assert_eq!(table.identifier.name(), "table");
496    }
497
498    fn make_kms() -> Arc<dyn KeyManagementClient> {
499        let kms = MemoryKeyManagementClient::new();
500        kms.add_master_key("master-1").unwrap();
501        Arc::new(kms)
502    }
503
504    #[tokio::test]
505    async fn table_decrypts_manifest_list_via_object_cache() {
506        // The fixture contains a snapshot with key-id, encryption-keys (KEK + wrapped DEK),
507        // all generated with the master key bytes below.
508        let mut metadata: TableMetadata = load_test_metadata("TableMetadataV3ValidEncryption.json");
509
510        // Point the snapshot's manifest-list at the testdata file on disk.
511        let manifest_list_path = format!(
512            "{}/testdata/manifests_lists/manifest-list-v3-encrypted.avro",
513            env!("CARGO_MANIFEST_DIR"),
514        );
515        let snapshot = metadata.snapshots.get_mut(&1).unwrap();
516        let mut patched = snapshot.as_ref().clone();
517        patched.manifest_list = manifest_list_path;
518        *snapshot = Arc::new(patched);
519
520        // Seed the KMS with the same master key bytes used to generate the fixture.
521        let kms: Arc<dyn KeyManagementClient> = {
522            let k = MemoryKeyManagementClient::new();
523            k.add_master_key_bytes(
524                "master-1",
525                SensitiveBytes::new([
526                    0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0a, 0x0b, 0x0c,
527                    0x0d, 0x0e, 0x0f,
528                ]),
529            )
530            .unwrap();
531            Arc::new(k)
532        };
533
534        let table = Table::builder()
535            .file_io(FileIO::new_with_fs())
536            .metadata(metadata)
537            .identifier(TableIdent::from_strs(["ns", "enc"]).unwrap())
538            .kms_client(kms)
539            .runtime(Runtime::try_current().unwrap())
540            .build()
541            .unwrap();
542
543        let snapshot_ref = table.metadata().current_snapshot().unwrap();
544        let manifest_list = table
545            .object_cache()
546            .get_manifest_list(snapshot_ref, &table.metadata_ref())
547            .await
548            .unwrap();
549        assert_eq!(manifest_list.entries().len(), 0);
550    }
551
552    #[tokio::test]
553    async fn table_builder_errors_when_encryption_key_id_set_but_no_kms() {
554        let metadata: TableMetadata = load_test_metadata("TableMetadataV3ValidEncryption.json");
555
556        let err = Table::builder()
557            .file_io(FileIO::new_with_memory())
558            .metadata(metadata)
559            .identifier(TableIdent::from_strs(["ns", "enc"]).unwrap())
560            .runtime(Runtime::try_current().unwrap())
561            .build()
562            .unwrap_err();
563        assert_eq!(err.kind(), ErrorKind::PreconditionFailed);
564    }
565
566    #[tokio::test]
567    async fn table_builder_skips_encryption_on_pre_v3_table() {
568        // Encryption is a v3 spec feature; pre-v3 tables silently skip
569        // encryption even if encryption.key-id is set.
570        let mut metadata: TableMetadata = load_test_metadata("TableMetadataV2ValidMinimal.json");
571        metadata.properties.insert(
572            TableProperties::PROPERTY_ENCRYPTION_KEY_ID.to_string(),
573            "master-1".to_string(),
574        );
575
576        let table = Table::builder()
577            .file_io(FileIO::new_with_memory())
578            .metadata(metadata)
579            .identifier(TableIdent::from_strs(["ns", "enc"]).unwrap())
580            .kms_client(make_kms())
581            .runtime(Runtime::try_current().unwrap())
582            .build()
583            .unwrap();
584        assert!(table.encryption_manager().is_none());
585    }
586
587    #[tokio::test]
588    async fn table_builder_skips_encryption_when_property_absent() {
589        let metadata: TableMetadata = load_test_metadata("TableMetadataV3ValidMinimal.json");
590        let table = Table::builder()
591            .file_io(FileIO::new_with_memory())
592            .metadata(metadata)
593            .identifier(TableIdent::from_strs(["ns", "plain"]).unwrap())
594            .kms_client(make_kms())
595            .runtime(Runtime::try_current().unwrap())
596            .build()
597            .unwrap();
598        assert!(table.encryption_manager().is_none());
599    }
600}