iceberg/encryption/
key_metadata.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Avro-serialized key metadata format compatible with Java's
19//! `org.apache.iceberg.encryption.StandardKeyMetadata`.
20
21use std::fmt;
22
23use super::SensitiveBytes;
24use crate::{Error, ErrorKind, Result};
25
26/// Standard key metadata for Iceberg table encryption.
27///
28/// Contains the Data Encryption Key (DEK), AAD prefix, and optional file
29/// length. Byte-compatible with Java's `StandardKeyMetadata` via Avro
30/// serialization.
31///
32/// Wire format: `[version byte (0x01)] [Avro binary datum]`
33#[derive(Clone, PartialEq, Eq)]
34pub struct StandardKeyMetadata {
35    encryption_key: SensitiveBytes,
36    aad_prefix: Option<Box<[u8]>>,
37    file_length: Option<u64>,
38}
39
40impl fmt::Debug for StandardKeyMetadata {
41    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
42        f.debug_struct("StandardKeyMetadata")
43            .field("encryption_key", &self.encryption_key)
44            .field(
45                "aad_prefix",
46                &self
47                    .aad_prefix
48                    .as_ref()
49                    .map(|b| format!("[{} bytes]", b.len())),
50            )
51            .field("file_length", &self.file_length)
52            .finish()
53    }
54}
55
56impl StandardKeyMetadata {
57    /// Creates a new `StandardKeyMetadata`.
58    pub fn new(encryption_key: &[u8]) -> Self {
59        Self {
60            encryption_key: SensitiveBytes::new(encryption_key),
61            aad_prefix: None,
62            file_length: None,
63        }
64    }
65
66    /// Adds an AAD prefix.
67    pub fn with_aad_prefix(mut self, aad_prefix: &[u8]) -> Self {
68        self.aad_prefix = Some(aad_prefix.into());
69        self
70    }
71
72    /// Adds a file length.
73    pub fn with_file_length(mut self, length: u64) -> Self {
74        self.file_length = Some(length);
75        self
76    }
77
78    /// Returns the plaintext Data Encryption Key.
79    pub fn encryption_key(&self) -> &SensitiveBytes {
80        &self.encryption_key
81    }
82
83    /// Returns the AAD prefix.
84    pub fn aad_prefix(&self) -> Option<&[u8]> {
85        self.aad_prefix.as_deref()
86    }
87
88    /// Returns the optional file length.
89    pub fn file_length(&self) -> Option<u64> {
90        self.file_length
91    }
92
93    /// Encodes to Java-compatible format: `[0x01] [Avro binary datum]`
94    pub fn encode(&self) -> Result<Box<[u8]>> {
95        _serde::StandardKeyMetadataV1::from(self).encode()
96    }
97
98    /// Decodes from Java-compatible format.
99    pub fn decode(bytes: &[u8]) -> Result<Self> {
100        _serde::StandardKeyMetadataV1::decode(bytes).map(Self::from)
101    }
102}
103
104mod _serde {
105    use std::io::Cursor;
106    use std::sync::{Arc, LazyLock};
107
108    use apache_avro::{Schema as AvroSchema, from_avro_datum, from_value, to_avro_datum, to_value};
109    use serde::{Deserialize, Serialize};
110
111    use super::*;
112    use crate::avro::schema_to_avro_schema;
113    use crate::spec::{NestedField, PrimitiveType, Schema, Type};
114
115    pub(super) const V1: u8 = 1;
116
117    /// Avro schema for StandardKeyMetadata V1, derived from Iceberg schema.
118    pub(super) static AVRO_SCHEMA_V1: LazyLock<AvroSchema> = LazyLock::new(|| {
119        let schema = Schema::builder()
120            .with_fields(vec![
121                Arc::new(NestedField::required(
122                    0,
123                    "encryption_key",
124                    Type::Primitive(PrimitiveType::Binary),
125                )),
126                Arc::new(NestedField::optional(
127                    1,
128                    "aad_prefix",
129                    Type::Primitive(PrimitiveType::Binary),
130                )),
131                Arc::new(NestedField::optional(
132                    2,
133                    "file_length",
134                    Type::Primitive(PrimitiveType::Long),
135                )),
136            ])
137            .build()
138            .expect("Failed to build StandardKeyMetadata Iceberg schema");
139
140        schema_to_avro_schema("StandardKeyMetadata", &schema)
141            .expect("Failed to convert StandardKeyMetadata to Avro schema")
142    });
143
144    /// Serde struct for Avro serialization of [`StandardKeyMetadata`] V1.
145    /// Field names must match [`AVRO_SCHEMA_V1`] exactly.
146    #[derive(Serialize, Deserialize)]
147    pub(super) struct StandardKeyMetadataV1 {
148        pub encryption_key: serde_bytes::ByteBuf,
149        pub aad_prefix: Option<serde_bytes::ByteBuf>,
150        pub file_length: Option<u64>,
151    }
152
153    impl StandardKeyMetadataV1 {
154        pub(super) fn encode(&self) -> Result<Box<[u8]>> {
155            let value = to_value(self)
156                .and_then(|v| v.resolve(&AVRO_SCHEMA_V1))
157                .map_err(|e| {
158                    Error::new(ErrorKind::Unexpected, "Failed to encode key metadata")
159                        .with_source(e)
160                })?;
161
162            let datum = to_avro_datum(&AVRO_SCHEMA_V1, value).map_err(|e| {
163                Error::new(ErrorKind::Unexpected, "Failed to encode key metadata").with_source(e)
164            })?;
165
166            let mut result = Vec::with_capacity(1 + datum.len());
167            result.push(V1);
168            result.extend_from_slice(&datum);
169            Ok(result.into_boxed_slice())
170        }
171
172        pub(super) fn decode(bytes: &[u8]) -> Result<Self> {
173            if bytes.is_empty() {
174                return Err(Error::new(
175                    ErrorKind::DataInvalid,
176                    "Empty key metadata buffer",
177                ));
178            }
179
180            let version = bytes[0];
181            if version != V1 {
182                return Err(Error::new(
183                    ErrorKind::FeatureUnsupported,
184                    format!("Cannot resolve schema for version: {version}"),
185                ));
186            }
187
188            let mut reader = Cursor::new(&bytes[1..]);
189            let value = from_avro_datum(&AVRO_SCHEMA_V1, &mut reader, None).map_err(|e| {
190                Error::new(ErrorKind::DataInvalid, "Failed to decode key metadata").with_source(e)
191            })?;
192
193            from_value(&value).map_err(|e| {
194                Error::new(
195                    ErrorKind::DataInvalid,
196                    "Failed to decode key metadata fields",
197                )
198                .with_source(e)
199            })
200        }
201    }
202
203    impl From<&StandardKeyMetadata> for StandardKeyMetadataV1 {
204        fn from(metadata: &StandardKeyMetadata) -> Self {
205            Self {
206                encryption_key: serde_bytes::ByteBuf::from(metadata.encryption_key.as_bytes()),
207                aad_prefix: metadata
208                    .aad_prefix
209                    .as_ref()
210                    .map(|b| serde_bytes::ByteBuf::from(b.as_ref())),
211                file_length: metadata.file_length,
212            }
213        }
214    }
215
216    impl From<StandardKeyMetadataV1> for StandardKeyMetadata {
217        fn from(v1: StandardKeyMetadataV1) -> Self {
218            Self {
219                encryption_key: SensitiveBytes::new(v1.encryption_key.into_vec()),
220                aad_prefix: v1.aad_prefix.map(|b| b.into_vec().into_boxed_slice()),
221                file_length: v1.file_length,
222            }
223        }
224    }
225}
226
227#[cfg(test)]
228mod tests {
229    use super::*;
230
231    #[test]
232    fn test_roundtrip() {
233        let key = b"0123456789012345";
234        let aad = b"1234567890123456";
235
236        let metadata = StandardKeyMetadata::new(key).with_aad_prefix(aad);
237        let serialized = metadata.encode().unwrap();
238        let parsed = StandardKeyMetadata::decode(&serialized).unwrap();
239
240        assert_eq!(parsed.encryption_key().as_bytes(), key);
241        assert_eq!(parsed.aad_prefix(), Some(aad.as_slice()));
242        assert_eq!(parsed.file_length(), None);
243    }
244
245    #[test]
246    fn test_roundtrip_with_length() {
247        let key = b"0123456789012345";
248        let aad = b"1234567890123456";
249
250        let file_length = 100_000;
251        let metadata = StandardKeyMetadata::new(key)
252            .with_aad_prefix(aad)
253            .with_file_length(file_length);
254        let serialized = metadata.encode().unwrap();
255        let parsed = StandardKeyMetadata::decode(&serialized).unwrap();
256
257        assert_eq!(parsed.encryption_key().as_bytes(), key);
258        assert_eq!(parsed.aad_prefix(), Some(aad.as_slice()));
259        assert_eq!(parsed.file_length(), Some(file_length));
260    }
261
262    #[test]
263    fn test_unsupported_version() {
264        let result = StandardKeyMetadata::decode(&[0x02]);
265        assert!(result.is_err());
266        let err = result.unwrap_err();
267        assert_eq!(err.kind(), ErrorKind::FeatureUnsupported);
268    }
269
270    #[test]
271    fn test_empty_buffer() {
272        let result = StandardKeyMetadata::decode(&[]);
273        assert!(result.is_err());
274        assert_eq!(result.unwrap_err().kind(), ErrorKind::DataInvalid);
275    }
276
277    #[test]
278    fn test_roundtrip_without_aad() {
279        let metadata = StandardKeyMetadata::new(&[1, 2, 3, 4]);
280        let serialized = metadata.encode().unwrap();
281        let parsed = StandardKeyMetadata::decode(&serialized).unwrap();
282
283        assert_eq!(parsed.encryption_key().as_bytes(), &[1, 2, 3, 4]);
284        assert_eq!(parsed.aad_prefix(), None);
285    }
286}