iceberg/encryption/
key_metadata.rs1use std::fmt;
22
23use super::SensitiveBytes;
24use crate::{Error, ErrorKind, Result};
25
26#[derive(Clone, PartialEq, Eq)]
34pub struct StandardKeyMetadata {
35 encryption_key: SensitiveBytes,
36 aad_prefix: Option<Box<[u8]>>,
37 file_length: Option<u64>,
38}
39
40impl fmt::Debug for StandardKeyMetadata {
41 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
42 f.debug_struct("StandardKeyMetadata")
43 .field("encryption_key", &self.encryption_key)
44 .field(
45 "aad_prefix",
46 &self
47 .aad_prefix
48 .as_ref()
49 .map(|b| format!("[{} bytes]", b.len())),
50 )
51 .field("file_length", &self.file_length)
52 .finish()
53 }
54}
55
56impl StandardKeyMetadata {
57 pub fn new(encryption_key: &[u8]) -> Self {
59 Self {
60 encryption_key: SensitiveBytes::new(encryption_key),
61 aad_prefix: None,
62 file_length: None,
63 }
64 }
65
66 pub fn with_aad_prefix(mut self, aad_prefix: &[u8]) -> Self {
68 self.aad_prefix = Some(aad_prefix.into());
69 self
70 }
71
72 pub fn with_file_length(mut self, length: u64) -> Self {
74 self.file_length = Some(length);
75 self
76 }
77
78 pub fn encryption_key(&self) -> &SensitiveBytes {
80 &self.encryption_key
81 }
82
83 pub fn aad_prefix(&self) -> Option<&[u8]> {
85 self.aad_prefix.as_deref()
86 }
87
88 pub fn file_length(&self) -> Option<u64> {
90 self.file_length
91 }
92
93 pub fn encode(&self) -> Result<Box<[u8]>> {
95 _serde::StandardKeyMetadataV1::from(self).encode()
96 }
97
98 pub fn decode(bytes: &[u8]) -> Result<Self> {
100 _serde::StandardKeyMetadataV1::decode(bytes).map(Self::from)
101 }
102}
103
104mod _serde {
105 use std::io::Cursor;
106 use std::sync::{Arc, LazyLock};
107
108 use apache_avro::{Schema as AvroSchema, from_avro_datum, from_value, to_avro_datum, to_value};
109 use serde::{Deserialize, Serialize};
110
111 use super::*;
112 use crate::avro::schema_to_avro_schema;
113 use crate::spec::{NestedField, PrimitiveType, Schema, Type};
114
115 pub(super) const V1: u8 = 1;
116
117 pub(super) static AVRO_SCHEMA_V1: LazyLock<AvroSchema> = LazyLock::new(|| {
119 let schema = Schema::builder()
120 .with_fields(vec![
121 Arc::new(NestedField::required(
122 0,
123 "encryption_key",
124 Type::Primitive(PrimitiveType::Binary),
125 )),
126 Arc::new(NestedField::optional(
127 1,
128 "aad_prefix",
129 Type::Primitive(PrimitiveType::Binary),
130 )),
131 Arc::new(NestedField::optional(
132 2,
133 "file_length",
134 Type::Primitive(PrimitiveType::Long),
135 )),
136 ])
137 .build()
138 .expect("Failed to build StandardKeyMetadata Iceberg schema");
139
140 schema_to_avro_schema("StandardKeyMetadata", &schema)
141 .expect("Failed to convert StandardKeyMetadata to Avro schema")
142 });
143
144 #[derive(Serialize, Deserialize)]
147 pub(super) struct StandardKeyMetadataV1 {
148 pub encryption_key: serde_bytes::ByteBuf,
149 pub aad_prefix: Option<serde_bytes::ByteBuf>,
150 pub file_length: Option<u64>,
151 }
152
153 impl StandardKeyMetadataV1 {
154 pub(super) fn encode(&self) -> Result<Box<[u8]>> {
155 let value = to_value(self)
156 .and_then(|v| v.resolve(&AVRO_SCHEMA_V1))
157 .map_err(|e| {
158 Error::new(ErrorKind::Unexpected, "Failed to encode key metadata")
159 .with_source(e)
160 })?;
161
162 let datum = to_avro_datum(&AVRO_SCHEMA_V1, value).map_err(|e| {
163 Error::new(ErrorKind::Unexpected, "Failed to encode key metadata").with_source(e)
164 })?;
165
166 let mut result = Vec::with_capacity(1 + datum.len());
167 result.push(V1);
168 result.extend_from_slice(&datum);
169 Ok(result.into_boxed_slice())
170 }
171
172 pub(super) fn decode(bytes: &[u8]) -> Result<Self> {
173 if bytes.is_empty() {
174 return Err(Error::new(
175 ErrorKind::DataInvalid,
176 "Empty key metadata buffer",
177 ));
178 }
179
180 let version = bytes[0];
181 if version != V1 {
182 return Err(Error::new(
183 ErrorKind::FeatureUnsupported,
184 format!("Cannot resolve schema for version: {version}"),
185 ));
186 }
187
188 let mut reader = Cursor::new(&bytes[1..]);
189 let value = from_avro_datum(&AVRO_SCHEMA_V1, &mut reader, None).map_err(|e| {
190 Error::new(ErrorKind::DataInvalid, "Failed to decode key metadata").with_source(e)
191 })?;
192
193 from_value(&value).map_err(|e| {
194 Error::new(
195 ErrorKind::DataInvalid,
196 "Failed to decode key metadata fields",
197 )
198 .with_source(e)
199 })
200 }
201 }
202
203 impl From<&StandardKeyMetadata> for StandardKeyMetadataV1 {
204 fn from(metadata: &StandardKeyMetadata) -> Self {
205 Self {
206 encryption_key: serde_bytes::ByteBuf::from(metadata.encryption_key.as_bytes()),
207 aad_prefix: metadata
208 .aad_prefix
209 .as_ref()
210 .map(|b| serde_bytes::ByteBuf::from(b.as_ref())),
211 file_length: metadata.file_length,
212 }
213 }
214 }
215
216 impl From<StandardKeyMetadataV1> for StandardKeyMetadata {
217 fn from(v1: StandardKeyMetadataV1) -> Self {
218 Self {
219 encryption_key: SensitiveBytes::new(v1.encryption_key.into_vec()),
220 aad_prefix: v1.aad_prefix.map(|b| b.into_vec().into_boxed_slice()),
221 file_length: v1.file_length,
222 }
223 }
224 }
225}
226
227#[cfg(test)]
228mod tests {
229 use super::*;
230
231 #[test]
232 fn test_roundtrip() {
233 let key = b"0123456789012345";
234 let aad = b"1234567890123456";
235
236 let metadata = StandardKeyMetadata::new(key).with_aad_prefix(aad);
237 let serialized = metadata.encode().unwrap();
238 let parsed = StandardKeyMetadata::decode(&serialized).unwrap();
239
240 assert_eq!(parsed.encryption_key().as_bytes(), key);
241 assert_eq!(parsed.aad_prefix(), Some(aad.as_slice()));
242 assert_eq!(parsed.file_length(), None);
243 }
244
245 #[test]
246 fn test_roundtrip_with_length() {
247 let key = b"0123456789012345";
248 let aad = b"1234567890123456";
249
250 let file_length = 100_000;
251 let metadata = StandardKeyMetadata::new(key)
252 .with_aad_prefix(aad)
253 .with_file_length(file_length);
254 let serialized = metadata.encode().unwrap();
255 let parsed = StandardKeyMetadata::decode(&serialized).unwrap();
256
257 assert_eq!(parsed.encryption_key().as_bytes(), key);
258 assert_eq!(parsed.aad_prefix(), Some(aad.as_slice()));
259 assert_eq!(parsed.file_length(), Some(file_length));
260 }
261
262 #[test]
263 fn test_unsupported_version() {
264 let result = StandardKeyMetadata::decode(&[0x02]);
265 assert!(result.is_err());
266 let err = result.unwrap_err();
267 assert_eq!(err.kind(), ErrorKind::FeatureUnsupported);
268 }
269
270 #[test]
271 fn test_empty_buffer() {
272 let result = StandardKeyMetadata::decode(&[]);
273 assert!(result.is_err());
274 assert_eq!(result.unwrap_err().kind(), ErrorKind::DataInvalid);
275 }
276
277 #[test]
278 fn test_roundtrip_without_aad() {
279 let metadata = StandardKeyMetadata::new(&[1, 2, 3, 4]);
280 let serialized = metadata.encode().unwrap();
281 let parsed = StandardKeyMetadata::decode(&serialized).unwrap();
282
283 assert_eq!(parsed.encryption_key().as_bytes(), &[1, 2, 3, 4]);
284 assert_eq!(parsed.aad_prefix(), None);
285 }
286}