1use std::collections::HashMap;
27use std::fmt;
28use std::sync::{Arc, RwLock};
29use std::time::Duration;
30
31use aes_gcm::aead::OsRng;
32use aes_gcm::aead::rand_core::RngCore;
33use chrono::Utc;
34use moka::future::Cache;
35use uuid::Uuid;
36
37const MILLIS_IN_DAY: i64 = 24 * 60 * 60 * 1000;
38
39use super::crypto::{AesGcmCipher, AesKeySize, SecureKey, SensitiveBytes};
40use super::io::EncryptedOutputFile;
41use super::key_metadata::StandardKeyMetadata;
42use super::kms::KeyManagementClient;
43use crate::io::OutputFile;
44use crate::spec::{EncryptedKey, FormatVersion, TableMetadataRef};
45use crate::{Error, ErrorKind, Result};
46
47pub const KEK_CREATED_AT_PROPERTY: &str = "KEY_TIMESTAMP";
50
51const DEFAULT_KEK_LIFESPAN_DAYS: i64 = 730;
53
54const DEFAULT_CACHE_TTL: Duration = Duration::from_secs(3600);
56
57const AAD_PREFIX_LENGTH: usize = 16;
60
61#[derive(typed_builder::TypedBuilder)]
65#[builder(mutators(
66 pub fn add_encryption_key(&mut self, key: EncryptedKey) {
68 self.encryption_keys
69 .write()
70 .expect("encryption_keys lock poisoned")
71 .insert(key.key_id().to_string(), key);
72 }
73 pub fn encryption_keys(&mut self, keys: HashMap<String, EncryptedKey>) {
75 self.encryption_keys = RwLock::new(keys);
76 }
77))]
78pub struct EncryptionManager {
79 kms_client: Arc<dyn KeyManagementClient>,
80 #[builder(
81 default = Cache::builder().time_to_live(DEFAULT_CACHE_TTL).build(),
82 setter(skip)
83 )]
84 kek_cache: Cache<String, SensitiveBytes>,
85 #[builder(default = AesKeySize::default())]
87 key_size: AesKeySize,
88 #[builder(setter(into))]
90 table_key_id: String,
91 #[builder(default = RwLock::new(HashMap::new()), via_mutators)]
95 encryption_keys: RwLock<HashMap<String, EncryptedKey>>,
96}
97
98impl fmt::Debug for EncryptionManager {
99 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
100 f.debug_struct("EncryptionManager")
101 .field("key_size", &self.key_size)
102 .field("table_key_id", &self.table_key_id)
103 .finish_non_exhaustive()
104 }
105}
106
107impl EncryptionManager {
108 pub(crate) fn from_table_metadata(
114 kms_client: Option<&Arc<dyn KeyManagementClient>>,
115 metadata: &TableMetadataRef,
116 ) -> Result<Option<Arc<Self>>> {
117 if metadata.format_version() < FormatVersion::V3 {
118 return Ok(None);
119 }
120
121 let table_properties = metadata.table_properties()?;
122 let Some(table_key_id) = table_properties.encryption_key_id else {
123 if kms_client.is_some() {
124 tracing::warn!(
125 "KeyManagementClient provided but table does not have encryption.key-id set"
126 );
127 }
128 return Ok(None);
129 };
130
131 let kms_client = kms_client.ok_or_else(|| {
132 Error::new(
133 ErrorKind::PreconditionFailed,
134 "Table has encryption.key-id set but no KeyManagementClient was provided to TableBuilder",
135 )
136 })?;
137
138 let em = EncryptionManager::builder()
139 .kms_client(Arc::clone(kms_client))
140 .table_key_id(table_key_id)
141 .encryption_keys(metadata.encryption_keys.clone())
142 .key_size(AesKeySize::from_key_length(
143 table_properties.encryption_data_key_length,
144 )?)
145 .build();
146 Ok(Some(Arc::new(em)))
147 }
148
149 pub fn encrypt(&self, raw_output: OutputFile) -> EncryptedOutputFile {
154 let dek = SecureKey::generate(self.key_size);
155 let aad_prefix = Self::generate_aad_prefix();
156 let metadata = StandardKeyMetadata::new(dek.as_bytes()).with_aad_prefix(&aad_prefix);
157 EncryptedOutputFile::new(raw_output, metadata)
158 }
159
160 pub async fn encrypt_manifest_list_key_metadata(
169 &self,
170 key_metadata: &StandardKeyMetadata,
171 ) -> Result<String> {
172 let kek = match self.find_active_kek()? {
173 Some(existing) => existing,
174 None => self.create_kek().await?,
175 };
176
177 let kek_bytes = self.unwrap_key_encryption_key(&kek).await?;
178
179 let aad = Self::kek_timestamp_aad(&kek)?;
181 let serialized = key_metadata.encode()?;
182 let wrapped_metadata = self.wrap_dek_with_kek(&serialized, &kek_bytes, Some(aad))?;
183
184 let wrapped_key = EncryptedKey::builder()
185 .key_id(Uuid::new_v4().to_string())
186 .encrypted_key_metadata(wrapped_metadata)
187 .encrypted_by_id(kek.key_id())
188 .build();
189
190 let wrapped_key_id = wrapped_key.key_id().to_string();
191 self.insert_encryption_key(wrapped_key);
192 Ok(wrapped_key_id)
193 }
194
195 pub async fn decrypt_manifest_list_key_metadata(
201 &self,
202 encryption_key_id: &str,
203 ) -> Result<StandardKeyMetadata> {
204 let encrypted_key = self
205 .encryption_keys
206 .read()
207 .expect("encryption_keys lock poisoned")
208 .get(encryption_key_id)
209 .cloned()
210 .ok_or_else(|| {
211 Error::new(
212 ErrorKind::DataInvalid,
213 format!("Encryption key '{encryption_key_id}' not found"),
214 )
215 })?;
216
217 let kek_key_id = encrypted_key.encrypted_by_id().ok_or_else(|| {
218 Error::new(
219 ErrorKind::DataInvalid,
220 format!(
221 "EncryptedKey '{}' has no encrypted_by_id",
222 encrypted_key.key_id()
223 ),
224 )
225 })?;
226
227 let bytes = self
228 .decrypt_dek(kek_key_id, encrypted_key.encrypted_key_metadata())
229 .await?;
230
231 StandardKeyMetadata::decode(bytes.as_bytes())
232 }
233
234 pub fn with_encryption_keys<F, R>(&self, f: F) -> R
239 where F: FnOnce(&HashMap<String, EncryptedKey>) -> R {
240 let keys = self
241 .encryption_keys
242 .read()
243 .expect("encryption_keys lock poisoned");
244 f(&keys)
245 }
246
247 fn insert_encryption_key(&self, key: EncryptedKey) {
248 self.encryption_keys
249 .write()
250 .expect("encryption_keys lock poisoned")
251 .insert(key.key_id().to_string(), key);
252 }
253
254 async fn create_kek(&self) -> Result<EncryptedKey> {
257 let (plaintext_kek, wrapped_kek) = if self.kms_client.supports_key_generation() {
258 let result = self.kms_client.generate_key(&self.table_key_id).await?;
259 (result.key().clone(), result.wrapped_key().to_vec())
260 } else {
261 let plaintext_key = SecureKey::generate(self.key_size);
262 let wrapped = self
263 .kms_client
264 .wrap_key(plaintext_key.as_bytes(), &self.table_key_id)
265 .await?;
266
267 (SensitiveBytes::new(plaintext_key.as_bytes()), wrapped)
268 };
269
270 let key_id = Uuid::new_v4().to_string();
271 let now_ms = Utc::now().timestamp_millis();
272
273 let mut properties = HashMap::new();
274 properties.insert(KEK_CREATED_AT_PROPERTY.to_string(), now_ms.to_string());
275
276 self.kek_cache.insert(key_id.clone(), plaintext_kek).await;
277
278 let kek = EncryptedKey::builder()
279 .key_id(key_id)
280 .encrypted_key_metadata(wrapped_kek)
281 .encrypted_by_id(&self.table_key_id)
282 .properties(properties)
283 .build();
284
285 self.insert_encryption_key(kek.clone());
286 Ok(kek)
287 }
288
289 fn is_kek_expired(&self, kek: &EncryptedKey) -> bool {
291 let created_at_ms = match kek
292 .properties()
293 .get(KEK_CREATED_AT_PROPERTY)
294 .and_then(|ts| ts.parse::<i64>().ok())
295 {
296 Some(ts) => ts,
297 None => return true, };
299
300 let now_ms = Utc::now().timestamp_millis();
301 let lifespan_ms = DEFAULT_KEK_LIFESPAN_DAYS * MILLIS_IN_DAY;
302 (now_ms - created_at_ms) >= lifespan_ms
303 }
304
305 fn find_active_kek(&self) -> Result<Option<EncryptedKey>> {
307 let keys = self
308 .encryption_keys
309 .read()
310 .expect("encryption_keys lock poisoned");
311 Ok(keys
312 .values()
313 .filter(|kek| {
314 kek.encrypted_by_id()
315 .map(|id| id == self.table_key_id)
316 .unwrap_or(false)
317 && !self.is_kek_expired(kek)
318 })
319 .max_by_key(|kek| {
320 kek.properties()
321 .get(KEK_CREATED_AT_PROPERTY)
322 .and_then(|ts| ts.parse::<i64>().ok())
323 .unwrap_or(0)
324 })
325 .cloned())
326 }
327
328 async fn unwrap_key_encryption_key(&self, kek: &EncryptedKey) -> Result<SensitiveBytes> {
330 let cache_key = kek.key_id().to_string();
331
332 if let Some(cached) = self.kek_cache.get(&cache_key).await {
333 return Ok(cached);
334 }
335
336 let master_key_id = kek.encrypted_by_id().ok_or_else(|| {
337 Error::new(
338 ErrorKind::DataInvalid,
339 format!("KEK '{}' has no encrypted_by_id", kek.key_id()),
340 )
341 })?;
342
343 let plaintext = self
344 .kms_client
345 .unwrap_key(kek.encrypted_key_metadata(), master_key_id)
346 .await?;
347
348 self.kek_cache.insert(cache_key, plaintext.clone()).await;
349
350 Ok(plaintext)
351 }
352
353 async fn decrypt_dek(&self, kek_key_id: &str, wrapped_dek: &[u8]) -> Result<SensitiveBytes> {
356 let kek = self
357 .encryption_keys
358 .read()
359 .expect("encryption_keys lock poisoned")
360 .get(kek_key_id)
361 .cloned()
362 .ok_or_else(|| {
363 Error::new(
364 ErrorKind::DataInvalid,
365 format!("KEK not found in encryption keys: {kek_key_id}"),
366 )
367 })?;
368
369 let aad = Self::kek_timestamp_aad(&kek)?;
371
372 let kek_bytes = self.unwrap_key_encryption_key(&kek).await?;
373 self.unwrap_dek_with_kek(wrapped_dek, &kek_bytes, Some(aad))
374 .map_err(|e| {
375 Error::new(
376 e.kind(),
377 format!("Failed to unwrap key metadata with KEK '{kek_key_id}'"),
378 )
379 .with_source(e)
380 })
381 }
382
383 fn kek_timestamp_aad(kek: &EncryptedKey) -> Result<&[u8]> {
385 kek.properties()
386 .get(KEK_CREATED_AT_PROPERTY)
387 .map(|ts| ts.as_bytes())
388 .ok_or_else(|| {
389 Error::new(
390 ErrorKind::DataInvalid,
391 format!(
392 "KEK '{}' is missing required '{}' property",
393 kek.key_id(),
394 KEK_CREATED_AT_PROPERTY
395 ),
396 )
397 })
398 }
399
400 fn generate_aad_prefix() -> Box<[u8]> {
402 let mut prefix = vec![0u8; AAD_PREFIX_LENGTH];
403 OsRng.fill_bytes(&mut prefix);
404 prefix.into_boxed_slice()
405 }
406
407 fn wrap_dek_with_kek(
409 &self,
410 dek: &[u8],
411 kek: &SensitiveBytes,
412 aad: Option<&[u8]>,
413 ) -> Result<Vec<u8>> {
414 let key = SecureKey::try_from(kek.clone())?;
415 let cipher = AesGcmCipher::new(key);
416 cipher.encrypt(dek, aad)
417 }
418
419 fn unwrap_dek_with_kek(
421 &self,
422 wrapped_dek: &[u8],
423 kek: &SensitiveBytes,
424 aad: Option<&[u8]>,
425 ) -> Result<SensitiveBytes> {
426 let key = SecureKey::try_from(kek.clone())?;
427 let cipher = AesGcmCipher::new(key);
428 cipher.decrypt(wrapped_dek, aad).map(SensitiveBytes::new)
429 }
430}
431
432#[cfg(test)]
433mod tests {
434 use super::*;
435 use crate::encryption::EncryptedInputFile;
436 use crate::encryption::kms::MemoryKeyManagementClient;
437
438 fn create_test_kms() -> Arc<dyn KeyManagementClient> {
439 let kms = MemoryKeyManagementClient::new();
440 kms.add_master_key("master-1").unwrap();
441 Arc::new(kms)
442 }
443
444 fn create_test_manager() -> EncryptionManager {
445 EncryptionManager::builder()
446 .kms_client(create_test_kms())
447 .table_key_id("master-1")
448 .build()
449 }
450
451 #[tokio::test]
452 async fn test_create_kek() {
453 let mgr = create_test_manager();
454 let kek = mgr.create_kek().await.unwrap();
455
456 assert!(!kek.key_id().is_empty());
457 assert!(!kek.encrypted_key_metadata().is_empty());
458 assert_eq!(kek.encrypted_by_id(), Some("master-1"));
459 assert!(kek.properties().contains_key(KEK_CREATED_AT_PROPERTY));
460 }
461
462 fn sample_key_metadata() -> StandardKeyMetadata {
463 StandardKeyMetadata::new(b"0123456789abcdef").with_aad_prefix(b"test-aad-prefix!")
464 }
465
466 #[tokio::test]
467 async fn test_wrap_unwrap_key_metadata_roundtrip() {
468 let mgr = create_test_manager();
469 let plaintext = sample_key_metadata();
470
471 let key_id = mgr
472 .encrypt_manifest_list_key_metadata(&plaintext)
473 .await
474 .unwrap();
475
476 assert_eq!(mgr.with_encryption_keys(|k| k.len()), 2);
478
479 let decrypted = mgr
480 .decrypt_manifest_list_key_metadata(&key_id)
481 .await
482 .unwrap();
483 assert_eq!(decrypted, plaintext);
484 }
485
486 #[tokio::test]
487 async fn test_kek_reuse_when_not_expired() {
488 let mgr = create_test_manager();
489
490 let _id1 = mgr
492 .encrypt_manifest_list_key_metadata(&sample_key_metadata())
493 .await
494 .unwrap();
495 let kek_id = mgr.with_encryption_keys(|keys| {
496 assert_eq!(keys.len(), 2);
497 keys.values()
498 .find(|k| k.encrypted_by_id() == Some("master-1"))
499 .unwrap()
500 .key_id()
501 .to_string()
502 });
503
504 let id2 = mgr
506 .encrypt_manifest_list_key_metadata(&sample_key_metadata())
507 .await
508 .unwrap();
509 let entry2 = mgr.with_encryption_keys(|keys| {
510 assert_eq!(keys.len(), 3);
511 keys.get(&id2).cloned().unwrap()
512 });
513 assert_eq!(entry2.encrypted_by_id(), Some(kek_id.as_str()));
514 }
515
516 #[tokio::test]
517 async fn test_kek_rotation_when_expired() {
518 let kms = create_test_kms();
519
520 let three_years_ago_ms = Utc::now().timestamp_millis() - (3 * 365 * MILLIS_IN_DAY);
522 let mut properties = HashMap::new();
523 properties.insert(
524 KEK_CREATED_AT_PROPERTY.to_string(),
525 three_years_ago_ms.to_string(),
526 );
527
528 let kek_key = SecureKey::generate(AesKeySize::Bits128);
530 let wrapped = kms.wrap_key(kek_key.as_bytes(), "master-1").await.unwrap();
531
532 let old_kek = EncryptedKey::builder()
533 .key_id("expired-kek")
534 .encrypted_key_metadata(wrapped)
535 .encrypted_by_id("master-1")
536 .properties(properties)
537 .build();
538
539 let mgr = EncryptionManager::builder()
541 .kms_client(kms)
542 .table_key_id("master-1")
543 .add_encryption_key(old_kek.clone())
544 .build();
545
546 let new_entry_id = mgr
548 .encrypt_manifest_list_key_metadata(&sample_key_metadata())
549 .await
550 .unwrap();
551 let entry = mgr
552 .with_encryption_keys(|keys| keys.get(&new_entry_id).cloned())
553 .unwrap();
554 let used_kek_id = entry.encrypted_by_id().unwrap();
555 assert_ne!(used_kek_id, old_kek.key_id());
556 }
557
558 #[tokio::test]
559 async fn test_is_kek_expired_no_timestamp() {
560 let mgr = create_test_manager();
561
562 let kek = EncryptedKey::builder()
564 .key_id("no-ts")
565 .encrypted_key_metadata(vec![0u8; 32])
566 .build();
567
568 assert!(mgr.is_kek_expired(&kek));
569 }
570
571 #[tokio::test]
572 async fn test_decrypt_with_unknown_key_id() {
573 let mgr = create_test_manager();
574 let result = mgr.decrypt_manifest_list_key_metadata("nonexistent").await;
575 assert!(result.is_err());
576 }
577
578 #[tokio::test]
579 async fn test_kek_cache_hit() {
580 let mgr = create_test_manager();
581
582 let key_id = mgr
584 .encrypt_manifest_list_key_metadata(&sample_key_metadata())
585 .await
586 .unwrap();
587
588 let _ = mgr
590 .decrypt_manifest_list_key_metadata(&key_id)
591 .await
592 .unwrap();
593 }
594
595 #[tokio::test]
596 async fn test_unwrap_fails_when_kek_missing_timestamp() {
597 let mgr = create_test_manager();
598
599 let entry_id = mgr
601 .encrypt_manifest_list_key_metadata(&sample_key_metadata())
602 .await
603 .unwrap();
604
605 let mut keys = mgr.with_encryption_keys(|k| k.clone());
608 let kek_id = keys
609 .get(&entry_id)
610 .unwrap()
611 .encrypted_by_id()
612 .unwrap()
613 .to_string();
614 let kek = keys.remove(&kek_id).unwrap();
615 let kek_no_ts = EncryptedKey::builder()
616 .key_id(kek.key_id())
617 .encrypted_key_metadata(kek.encrypted_key_metadata())
618 .encrypted_by_id(kek.encrypted_by_id().unwrap())
619 .build();
620 keys.insert(kek_no_ts.key_id().to_string(), kek_no_ts);
621
622 let mgr = EncryptionManager::builder()
623 .kms_client(create_test_kms())
624 .table_key_id("master-1")
625 .encryption_keys(keys)
626 .build();
627
628 let result = mgr.decrypt_manifest_list_key_metadata(&entry_id).await;
629 assert!(result.is_err());
630 let err = result.unwrap_err();
631 assert_eq!(err.kind(), ErrorKind::DataInvalid);
632 assert!(
633 err.to_string().contains(KEK_CREATED_AT_PROPERTY),
634 "error should mention the missing property: {err}"
635 );
636 }
637
638 #[tokio::test]
639 async fn test_unwrap_fails_when_kek_timestamp_tampered() {
640 let mgr = create_test_manager();
641
642 let entry_id = mgr
644 .encrypt_manifest_list_key_metadata(&sample_key_metadata())
645 .await
646 .unwrap();
647
648 let mut keys = mgr.with_encryption_keys(|k| k.clone());
650 let kek_id = keys
651 .get(&entry_id)
652 .unwrap()
653 .encrypted_by_id()
654 .unwrap()
655 .to_string();
656 let kek = keys.remove(&kek_id).unwrap();
657 let mut tampered_properties = kek.properties().clone();
658 tampered_properties.insert(KEK_CREATED_AT_PROPERTY.to_string(), "9999999".to_string());
659 let tampered_kek = EncryptedKey::builder()
660 .key_id(kek.key_id())
661 .encrypted_key_metadata(kek.encrypted_key_metadata())
662 .encrypted_by_id(kek.encrypted_by_id().unwrap())
663 .properties(tampered_properties)
664 .build();
665 keys.insert(tampered_kek.key_id().to_string(), tampered_kek);
666
667 let mgr = EncryptionManager::builder()
668 .kms_client(create_test_kms())
669 .table_key_id("master-1")
670 .encryption_keys(keys)
671 .build();
672
673 let result = mgr.decrypt_manifest_list_key_metadata(&entry_id).await;
675 assert!(
676 result.is_err(),
677 "tampered timestamp should cause decryption failure"
678 );
679 }
680
681 #[tokio::test]
682 async fn test_encrypt_decrypt_roundtrip() {
683 use crate::io::FileIO;
684
685 let io = FileIO::new_with_memory();
686 let path = "memory:///test/encrypt_roundtrip.bin";
687
688 let kms = MemoryKeyManagementClient::new();
689 kms.add_master_key("master-1").unwrap();
690 let mgr = EncryptionManager::builder()
691 .kms_client(Arc::new(kms) as Arc<dyn KeyManagementClient>)
692 .table_key_id("master-1")
693 .build();
694
695 let output = io.new_output(path).unwrap();
696 let encrypted_output = mgr.encrypt(output);
697
698 let plaintext = b"Hello, encrypted Iceberg round-trip!";
699 let serialized_metadata = encrypted_output.key_metadata().encode().unwrap();
700 encrypted_output
701 .write(bytes::Bytes::from(plaintext.to_vec()))
702 .await
703 .unwrap();
704
705 let input = io.new_input(path).unwrap();
706 let parsed_metadata = StandardKeyMetadata::decode(&serialized_metadata).unwrap();
707 let decrypted_file = EncryptedInputFile::new(input, parsed_metadata);
708
709 let content = decrypted_file.read().await.unwrap();
710 assert_eq!(&content[..], plaintext);
711 }
712}