iceberg/spec/manifest_list/
reader.rs1use std::sync::Arc;
19
20use super::ManifestList;
21use crate::encryption::{EncryptedInputFile, EncryptionManager};
22use crate::error::Result;
23use crate::io::FileIO;
24use crate::spec::{SnapshotRef, TableMetadataRef};
25use crate::{Error, ErrorKind};
26
27pub struct ManifestListReader {
30 snapshot: SnapshotRef,
31 file_io: FileIO,
32 table_metadata: TableMetadataRef,
33 encryption_manager: Option<Arc<EncryptionManager>>,
34}
35
36impl ManifestListReader {
37 pub(crate) fn new(
38 snapshot: SnapshotRef,
39 file_io: FileIO,
40 table_metadata: TableMetadataRef,
41 encryption_manager: Option<Arc<EncryptionManager>>,
42 ) -> Self {
43 Self {
44 snapshot,
45 file_io,
46 table_metadata,
47 encryption_manager,
48 }
49 }
50
51 pub async fn load(&self) -> Result<ManifestList> {
53 let manifest_list_content = match self.snapshot.encryption_key_id() {
54 Some(key_id) => {
55 let em = self.encryption_manager.as_ref().ok_or_else(|| {
56 Error::new(
57 ErrorKind::PreconditionFailed,
58 "Snapshot has encryption_key_id but no EncryptionManager configured on Table",
59 )
60 })?;
61 let key_metadata = em.decrypt_manifest_list_key_metadata(key_id).await?;
62 let input = self.file_io.new_input(self.snapshot.manifest_list())?;
63 EncryptedInputFile::new(input, key_metadata).read().await?
64 }
65 None => {
66 self.file_io
67 .new_input(self.snapshot.manifest_list())?
68 .read()
69 .await?
70 }
71 };
72 ManifestList::parse_with_version(
73 &manifest_list_content,
74 self.table_metadata.format_version(),
75 )
76 }
77}
78
79#[cfg(test)]
80mod tests {
81 use std::sync::Arc;
82
83 use super::ManifestListReader;
84 use crate::encryption::kms::MemoryKeyManagementClient;
85 use crate::encryption::{EncryptionManager, SensitiveBytes};
86 use crate::io::FileIO;
87 use crate::spec::{TableMetadata, TableMetadataRef};
88
89 fn encryption_test_metadata() -> TableMetadata {
90 let path = format!(
91 "{}/testdata/table_metadata/TableMetadataV3ValidEncryption.json",
92 env!("CARGO_MANIFEST_DIR"),
93 );
94 let json = std::fs::read_to_string(path).unwrap();
95 serde_json::from_str(&json).unwrap()
96 }
97
98 #[tokio::test]
99 async fn load_manifest_list_errors_when_encrypted_but_no_manager_configured() {
100 let mut metadata = encryption_test_metadata();
101
102 let manifest_list_path = format!(
103 "{}/testdata/manifests_lists/manifest-list-v3-encrypted.avro",
104 env!("CARGO_MANIFEST_DIR"),
105 );
106 let snapshot = metadata.snapshots.get_mut(&1).unwrap();
107 let mut patched = snapshot.as_ref().clone();
108 patched.manifest_list = manifest_list_path;
109 *snapshot = Arc::new(patched);
110
111 let snapshot_ref = metadata.current_snapshot().unwrap().clone();
112 let metadata_ref = TableMetadataRef::new(metadata);
113
114 let err = ManifestListReader::new(snapshot_ref, FileIO::new_with_fs(), metadata_ref, None)
115 .load()
116 .await
117 .unwrap_err();
118 assert_eq!(err.kind(), crate::ErrorKind::PreconditionFailed);
119 }
120
121 #[tokio::test]
122 async fn load_manifest_list_decrypts_roundtrip() {
123 let mut metadata = encryption_test_metadata();
124
125 let manifest_list_path = format!(
126 "{}/testdata/manifests_lists/manifest-list-v3-encrypted.avro",
127 env!("CARGO_MANIFEST_DIR"),
128 );
129 let snapshot = metadata.snapshots.get_mut(&1).unwrap();
130 let mut patched = snapshot.as_ref().clone();
131 patched.manifest_list = manifest_list_path;
132 *snapshot = Arc::new(patched);
133
134 let kms = MemoryKeyManagementClient::new();
135 kms.add_master_key_bytes(
136 "master-1",
137 SensitiveBytes::new([
138 0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0a, 0x0b, 0x0c, 0x0d,
139 0x0e, 0x0f,
140 ]),
141 )
142 .unwrap();
143
144 let mgr = EncryptionManager::builder()
145 .kms_client(Arc::new(kms) as Arc<dyn crate::encryption::kms::KeyManagementClient>)
146 .table_key_id("master-1")
147 .encryption_keys(metadata.encryption_keys.clone())
148 .build();
149
150 let snapshot_ref = metadata.current_snapshot().unwrap().clone();
151 let metadata_ref = TableMetadataRef::new(metadata);
152
153 let manifest_list = ManifestListReader::new(
154 snapshot_ref,
155 FileIO::new_with_fs(),
156 metadata_ref,
157 Some(Arc::new(mgr)),
158 )
159 .load()
160 .await
161 .unwrap();
162 assert_eq!(manifest_list.entries().len(), 0);
163 }
164}