Skip to main content

iceberg/spec/manifest_list/
reader.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::sync::Arc;
19
20use super::ManifestList;
21use crate::encryption::{EncryptedInputFile, EncryptionManager};
22use crate::error::Result;
23use crate::io::FileIO;
24use crate::spec::{SnapshotRef, TableMetadataRef};
25use crate::{Error, ErrorKind};
26
27/// A manifest list reader that encapsulates the logic for loading and parsing a [`ManifestList`]
28/// from a snapshot.
29pub struct ManifestListReader {
30    snapshot: SnapshotRef,
31    file_io: FileIO,
32    table_metadata: TableMetadataRef,
33    encryption_manager: Option<Arc<EncryptionManager>>,
34}
35
36impl ManifestListReader {
37    pub(crate) fn new(
38        snapshot: SnapshotRef,
39        file_io: FileIO,
40        table_metadata: TableMetadataRef,
41        encryption_manager: Option<Arc<EncryptionManager>>,
42    ) -> Self {
43        Self {
44            snapshot,
45            file_io,
46            table_metadata,
47            encryption_manager,
48        }
49    }
50
51    /// Loads and returns the [`ManifestList`] for this snapshot.
52    pub async fn load(&self) -> Result<ManifestList> {
53        let manifest_list_content = match self.snapshot.encryption_key_id() {
54            Some(key_id) => {
55                let em = self.encryption_manager.as_ref().ok_or_else(|| {
56                    Error::new(
57                        ErrorKind::PreconditionFailed,
58                        "Snapshot has encryption_key_id but no EncryptionManager configured on Table",
59                    )
60                })?;
61                let key_metadata = em.decrypt_manifest_list_key_metadata(key_id).await?;
62                let input = self.file_io.new_input(self.snapshot.manifest_list())?;
63                EncryptedInputFile::new(input, key_metadata).read().await?
64            }
65            None => {
66                self.file_io
67                    .new_input(self.snapshot.manifest_list())?
68                    .read()
69                    .await?
70            }
71        };
72        ManifestList::parse_with_version(
73            &manifest_list_content,
74            self.table_metadata.format_version(),
75        )
76    }
77}
78
79#[cfg(test)]
80mod tests {
81    use std::sync::Arc;
82
83    use super::ManifestListReader;
84    use crate::encryption::kms::MemoryKeyManagementClient;
85    use crate::encryption::{EncryptionManager, SensitiveBytes};
86    use crate::io::FileIO;
87    use crate::spec::{TableMetadata, TableMetadataRef};
88
89    fn encryption_test_metadata() -> TableMetadata {
90        let path = format!(
91            "{}/testdata/table_metadata/TableMetadataV3ValidEncryption.json",
92            env!("CARGO_MANIFEST_DIR"),
93        );
94        let json = std::fs::read_to_string(path).unwrap();
95        serde_json::from_str(&json).unwrap()
96    }
97
98    #[tokio::test]
99    async fn load_manifest_list_errors_when_encrypted_but_no_manager_configured() {
100        let mut metadata = encryption_test_metadata();
101
102        let manifest_list_path = format!(
103            "{}/testdata/manifests_lists/manifest-list-v3-encrypted.avro",
104            env!("CARGO_MANIFEST_DIR"),
105        );
106        let snapshot = metadata.snapshots.get_mut(&1).unwrap();
107        let mut patched = snapshot.as_ref().clone();
108        patched.manifest_list = manifest_list_path;
109        *snapshot = Arc::new(patched);
110
111        let snapshot_ref = metadata.current_snapshot().unwrap().clone();
112        let metadata_ref = TableMetadataRef::new(metadata);
113
114        let err = ManifestListReader::new(snapshot_ref, FileIO::new_with_fs(), metadata_ref, None)
115            .load()
116            .await
117            .unwrap_err();
118        assert_eq!(err.kind(), crate::ErrorKind::PreconditionFailed);
119    }
120
121    #[tokio::test]
122    async fn load_manifest_list_decrypts_roundtrip() {
123        let mut metadata = encryption_test_metadata();
124
125        let manifest_list_path = format!(
126            "{}/testdata/manifests_lists/manifest-list-v3-encrypted.avro",
127            env!("CARGO_MANIFEST_DIR"),
128        );
129        let snapshot = metadata.snapshots.get_mut(&1).unwrap();
130        let mut patched = snapshot.as_ref().clone();
131        patched.manifest_list = manifest_list_path;
132        *snapshot = Arc::new(patched);
133
134        let kms = MemoryKeyManagementClient::new();
135        kms.add_master_key_bytes(
136            "master-1",
137            SensitiveBytes::new([
138                0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0a, 0x0b, 0x0c, 0x0d,
139                0x0e, 0x0f,
140            ]),
141        )
142        .unwrap();
143
144        let mgr = EncryptionManager::builder()
145            .kms_client(Arc::new(kms) as Arc<dyn crate::encryption::kms::KeyManagementClient>)
146            .table_key_id("master-1")
147            .encryption_keys(metadata.encryption_keys.clone())
148            .build();
149
150        let snapshot_ref = metadata.current_snapshot().unwrap().clone();
151        let metadata_ref = TableMetadataRef::new(metadata);
152
153        let manifest_list = ManifestListReader::new(
154            snapshot_ref,
155            FileIO::new_with_fs(),
156            metadata_ref,
157            Some(Arc::new(mgr)),
158        )
159        .load()
160        .await
161        .unwrap();
162        assert_eq!(manifest_list.entries().len(), 0);
163    }
164}