iceberg/encryption/
manager.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Encryption manager for file-level encryption and two-layer envelope key management.
19//!
20//! [`EncryptionManager`] provides file-level `decrypt` / `encrypt`
21//! operations matching Java's `org.apache.iceberg.encryption.EncryptionManager`,
22//! using envelope encryption:
23//! - A master key (in KMS) wraps a Key Encryption Key (KEK)
24//! - The KEK wraps Data Encryption Keys (DEKs) locally
25
26use std::collections::HashMap;
27use std::fmt;
28use std::sync::{Arc, RwLock};
29use std::time::Duration;
30
31use aes_gcm::aead::OsRng;
32use aes_gcm::aead::rand_core::RngCore;
33use chrono::Utc;
34use moka::future::Cache;
35use uuid::Uuid;
36
37const MILLIS_IN_DAY: i64 = 24 * 60 * 60 * 1000;
38
39use super::crypto::{AesGcmCipher, AesKeySize, SecureKey, SensitiveBytes};
40use super::io::EncryptedOutputFile;
41use super::key_metadata::StandardKeyMetadata;
42use super::kms::KeyManagementClient;
43use crate::io::OutputFile;
44use crate::spec::{EncryptedKey, FormatVersion, TableMetadataRef};
45use crate::{Error, ErrorKind, Result};
46
47/// Property key for the KEK creation timestamp (milliseconds since epoch).
48/// Matches Java's `StandardEncryptionManager.KEY_TIMESTAMP`.
49pub const KEK_CREATED_AT_PROPERTY: &str = "KEY_TIMESTAMP";
50
51/// Default KEK lifespan in days, per NIST SP 800-57.
52const DEFAULT_KEK_LIFESPAN_DAYS: i64 = 730;
53
54/// Default cache TTL for unwrapped KEKs.
55const DEFAULT_CACHE_TTL: Duration = Duration::from_secs(3600);
56
57/// Default AAD prefix length in bytes.
58/// Matches Java's `TableProperties.ENCRYPTION_AAD_LENGTH_DEFAULT`.
59const AAD_PREFIX_LENGTH: usize = 16;
60
61/// File-level encryption manager using two-layer envelope encryption.
62///
63/// Uses an async cache for unwrapped KEK bytes to avoid repeated KMS calls.
64#[derive(typed_builder::TypedBuilder)]
65#[builder(mutators(
66    /// Add an encryption key (KEK or wrapped key metadata entry).
67    pub fn add_encryption_key(&mut self, key: EncryptedKey) {
68        self.encryption_keys
69            .write()
70            .expect("encryption_keys lock poisoned")
71            .insert(key.key_id().to_string(), key);
72    }
73    /// Set all encryption keys from table metadata.
74    pub fn encryption_keys(&mut self, keys: HashMap<String, EncryptedKey>) {
75        self.encryption_keys = RwLock::new(keys);
76    }
77))]
78pub struct EncryptionManager {
79    kms_client: Arc<dyn KeyManagementClient>,
80    #[builder(
81        default = Cache::builder().time_to_live(DEFAULT_CACHE_TTL).build(),
82        setter(skip)
83    )]
84    kek_cache: Cache<String, SensitiveBytes>,
85    /// AES key size for DEK generation. Defaults to 128-bit.
86    #[builder(default = AesKeySize::default())]
87    key_size: AesKeySize,
88    /// Master key ID from table property `encryption.key-id`.
89    #[builder(setter(into))]
90    table_key_id: String,
91    /// All encryption keys from table metadata (KEKs and wrapped key metadata entries).
92    /// Newly created KEKs and wrapped manifest-list entries are inserted here so
93    /// callers can snapshot the full set at commit time via [`EncryptionManager::encryption_keys`].
94    #[builder(default = RwLock::new(HashMap::new()), via_mutators)]
95    encryption_keys: RwLock<HashMap<String, EncryptedKey>>,
96}
97
98impl fmt::Debug for EncryptionManager {
99    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
100        f.debug_struct("EncryptionManager")
101            .field("key_size", &self.key_size)
102            .field("table_key_id", &self.table_key_id)
103            .finish_non_exhaustive()
104    }
105}
106
107impl EncryptionManager {
108    /// Attempt to construct an [`EncryptionManager`] from table metadata.
109    ///
110    /// Returns `Ok(None)` if the format version is below v3 or the
111    /// `encryption.key-id` property is not set. Returns an error if the
112    /// property is set but no [`KeyManagementClient`] was provided.
113    pub(crate) fn from_table_metadata(
114        kms_client: Option<&Arc<dyn KeyManagementClient>>,
115        metadata: &TableMetadataRef,
116    ) -> Result<Option<Arc<Self>>> {
117        if metadata.format_version() < FormatVersion::V3 {
118            return Ok(None);
119        }
120
121        let table_properties = metadata.table_properties()?;
122        let Some(table_key_id) = table_properties.encryption_key_id else {
123            if kms_client.is_some() {
124                tracing::warn!(
125                    "KeyManagementClient provided but table does not have encryption.key-id set"
126                );
127            }
128            return Ok(None);
129        };
130
131        let kms_client = kms_client.ok_or_else(|| {
132            Error::new(
133                ErrorKind::PreconditionFailed,
134                "Table has encryption.key-id set but no KeyManagementClient was provided to TableBuilder",
135            )
136        })?;
137
138        let em = EncryptionManager::builder()
139            .kms_client(Arc::clone(kms_client))
140            .table_key_id(table_key_id)
141            .encryption_keys(metadata.encryption_keys.clone())
142            .key_size(AesKeySize::from_key_length(
143                table_properties.encryption_data_key_length,
144            )?)
145            .build();
146        Ok(Some(Arc::new(em)))
147    }
148
149    /// Encrypt a file with AGS1 stream encryption.
150    ///
151    /// Returns an [`EncryptedOutputFile`] that transparently encrypts on
152    /// write, along with key metadata for later decryption.
153    pub fn encrypt(&self, raw_output: OutputFile) -> EncryptedOutputFile {
154        let dek = SecureKey::generate(self.key_size);
155        let aad_prefix = Self::generate_aad_prefix();
156        let metadata = StandardKeyMetadata::new(dek.as_bytes()).with_aad_prefix(&aad_prefix);
157        EncryptedOutputFile::new(raw_output, metadata)
158    }
159
160    /// Wrap a manifest list key metadata with a KEK for storage in table metadata.
161    ///
162    /// Stores the resulting wrapped entry (and any newly created KEK) in the
163    /// manager's internal `encryption_keys` map. Callers persist the full set
164    /// at commit time via [`Self::encryption_keys`].
165    ///
166    /// Returns the `key_id` of the wrapped entry, which should be recorded on
167    /// the snapshot as `encryption_key_id` so readers can locate it later.
168    pub async fn encrypt_manifest_list_key_metadata(
169        &self,
170        key_metadata: &StandardKeyMetadata,
171    ) -> Result<String> {
172        let kek = match self.find_active_kek()? {
173            Some(existing) => existing,
174            None => self.create_kek().await?,
175        };
176
177        let kek_bytes = self.unwrap_key_encryption_key(&kek).await?;
178
179        // Use the KEK timestamp as AAD to prevent timestamp tampering attacks.
180        let aad = Self::kek_timestamp_aad(&kek)?;
181        let serialized = key_metadata.encode()?;
182        let wrapped_metadata = self.wrap_dek_with_kek(&serialized, &kek_bytes, Some(aad))?;
183
184        let wrapped_key = EncryptedKey::builder()
185            .key_id(Uuid::new_v4().to_string())
186            .encrypted_key_metadata(wrapped_metadata)
187            .encrypted_by_id(kek.key_id())
188            .build();
189
190        let wrapped_key_id = wrapped_key.key_id().to_string();
191        self.insert_encryption_key(wrapped_key);
192        Ok(wrapped_key_id)
193    }
194
195    /// Decrypt a manifest list key metadata previously wrapped via
196    /// [`Self::encrypt_manifest_list_key_metadata`].
197    ///
198    /// Looks up the entry by `encryption_key_id` (typically read from the
199    /// snapshot) in the manager's `encryption_keys` map.
200    pub async fn decrypt_manifest_list_key_metadata(
201        &self,
202        encryption_key_id: &str,
203    ) -> Result<StandardKeyMetadata> {
204        let encrypted_key = self
205            .encryption_keys
206            .read()
207            .expect("encryption_keys lock poisoned")
208            .get(encryption_key_id)
209            .cloned()
210            .ok_or_else(|| {
211                Error::new(
212                    ErrorKind::DataInvalid,
213                    format!("Encryption key '{encryption_key_id}' not found"),
214                )
215            })?;
216
217        let kek_key_id = encrypted_key.encrypted_by_id().ok_or_else(|| {
218            Error::new(
219                ErrorKind::DataInvalid,
220                format!(
221                    "EncryptedKey '{}' has no encrypted_by_id",
222                    encrypted_key.key_id()
223                ),
224            )
225        })?;
226
227        let bytes = self
228            .decrypt_dek(kek_key_id, encrypted_key.encrypted_key_metadata())
229            .await?;
230
231        StandardKeyMetadata::decode(bytes.as_bytes())
232    }
233
234    /// Borrow the encryption keys held by this manager.
235    ///
236    /// Use at commit time to persist newly created KEKs and wrapped
237    /// manifest-list entries into `TableMetadata.encryption_keys`.
238    pub fn with_encryption_keys<F, R>(&self, f: F) -> R
239    where F: FnOnce(&HashMap<String, EncryptedKey>) -> R {
240        let keys = self
241            .encryption_keys
242            .read()
243            .expect("encryption_keys lock poisoned");
244        f(&keys)
245    }
246
247    fn insert_encryption_key(&self, key: EncryptedKey) {
248        self.encryption_keys
249            .write()
250            .expect("encryption_keys lock poisoned")
251            .insert(key.key_id().to_string(), key);
252    }
253
254    /// Create a new KEK, wrapped by the table's master key, and store it in
255    /// the manager's `encryption_keys` map.
256    async fn create_kek(&self) -> Result<EncryptedKey> {
257        let (plaintext_kek, wrapped_kek) = if self.kms_client.supports_key_generation() {
258            let result = self.kms_client.generate_key(&self.table_key_id).await?;
259            (result.key().clone(), result.wrapped_key().to_vec())
260        } else {
261            let plaintext_key = SecureKey::generate(self.key_size);
262            let wrapped = self
263                .kms_client
264                .wrap_key(plaintext_key.as_bytes(), &self.table_key_id)
265                .await?;
266
267            (SensitiveBytes::new(plaintext_key.as_bytes()), wrapped)
268        };
269
270        let key_id = Uuid::new_v4().to_string();
271        let now_ms = Utc::now().timestamp_millis();
272
273        let mut properties = HashMap::new();
274        properties.insert(KEK_CREATED_AT_PROPERTY.to_string(), now_ms.to_string());
275
276        self.kek_cache.insert(key_id.clone(), plaintext_kek).await;
277
278        let kek = EncryptedKey::builder()
279            .key_id(key_id)
280            .encrypted_key_metadata(wrapped_kek)
281            .encrypted_by_id(&self.table_key_id)
282            .properties(properties)
283            .build();
284
285        self.insert_encryption_key(kek.clone());
286        Ok(kek)
287    }
288
289    /// Check whether a KEK has exceeded its configured lifespan (730 days per NIST SP 800-57).
290    fn is_kek_expired(&self, kek: &EncryptedKey) -> bool {
291        let created_at_ms = match kek
292            .properties()
293            .get(KEK_CREATED_AT_PROPERTY)
294            .and_then(|ts| ts.parse::<i64>().ok())
295        {
296            Some(ts) => ts,
297            None => return true, // No timestamp -> treat as expired
298        };
299
300        let now_ms = Utc::now().timestamp_millis();
301        let lifespan_ms = DEFAULT_KEK_LIFESPAN_DAYS * MILLIS_IN_DAY;
302        (now_ms - created_at_ms) >= lifespan_ms
303    }
304
305    /// Find the latest non-expired KEK for the table's master key.
306    fn find_active_kek(&self) -> Result<Option<EncryptedKey>> {
307        let keys = self
308            .encryption_keys
309            .read()
310            .expect("encryption_keys lock poisoned");
311        Ok(keys
312            .values()
313            .filter(|kek| {
314                kek.encrypted_by_id()
315                    .map(|id| id == self.table_key_id)
316                    .unwrap_or(false)
317                    && !self.is_kek_expired(kek)
318            })
319            .max_by_key(|kek| {
320                kek.properties()
321                    .get(KEK_CREATED_AT_PROPERTY)
322                    .and_then(|ts| ts.parse::<i64>().ok())
323                    .unwrap_or(0)
324            })
325            .cloned())
326    }
327
328    /// Unwrap a KEK using the KMS, with caching to avoid repeated calls.
329    async fn unwrap_key_encryption_key(&self, kek: &EncryptedKey) -> Result<SensitiveBytes> {
330        let cache_key = kek.key_id().to_string();
331
332        if let Some(cached) = self.kek_cache.get(&cache_key).await {
333            return Ok(cached);
334        }
335
336        let master_key_id = kek.encrypted_by_id().ok_or_else(|| {
337            Error::new(
338                ErrorKind::DataInvalid,
339                format!("KEK '{}' has no encrypted_by_id", kek.key_id()),
340            )
341        })?;
342
343        let plaintext = self
344            .kms_client
345            .unwrap_key(kek.encrypted_key_metadata(), master_key_id)
346            .await?;
347
348        self.kek_cache.insert(cache_key, plaintext.clone()).await;
349
350        Ok(plaintext)
351    }
352
353    /// Decrypt a wrapped DEK using the KEK identified by `kek_key_id`,
354    /// looked up in the manager's own `encryption_keys` map.
355    async fn decrypt_dek(&self, kek_key_id: &str, wrapped_dek: &[u8]) -> Result<SensitiveBytes> {
356        let kek = self
357            .encryption_keys
358            .read()
359            .expect("encryption_keys lock poisoned")
360            .get(kek_key_id)
361            .cloned()
362            .ok_or_else(|| {
363                Error::new(
364                    ErrorKind::DataInvalid,
365                    format!("KEK not found in encryption keys: {kek_key_id}"),
366                )
367            })?;
368
369        // KEK timestamp as AAD prevents timestamp tampering.
370        let aad = Self::kek_timestamp_aad(&kek)?;
371
372        let kek_bytes = self.unwrap_key_encryption_key(&kek).await?;
373        self.unwrap_dek_with_kek(wrapped_dek, &kek_bytes, Some(aad))
374            .map_err(|e| {
375                Error::new(
376                    e.kind(),
377                    format!("Failed to unwrap key metadata with KEK '{kek_key_id}'"),
378                )
379                .with_source(e)
380            })
381    }
382
383    /// Extract the KEK timestamp for use as AAD. Returns an error if missing.
384    fn kek_timestamp_aad(kek: &EncryptedKey) -> Result<&[u8]> {
385        kek.properties()
386            .get(KEK_CREATED_AT_PROPERTY)
387            .map(|ts| ts.as_bytes())
388            .ok_or_else(|| {
389                Error::new(
390                    ErrorKind::DataInvalid,
391                    format!(
392                        "KEK '{}' is missing required '{}' property",
393                        kek.key_id(),
394                        KEK_CREATED_AT_PROPERTY
395                    ),
396                )
397            })
398    }
399
400    /// Generate a random AAD prefix for file encryption.
401    fn generate_aad_prefix() -> Box<[u8]> {
402        let mut prefix = vec![0u8; AAD_PREFIX_LENGTH];
403        OsRng.fill_bytes(&mut prefix);
404        prefix.into_boxed_slice()
405    }
406
407    /// Wrap a DEK with a KEK using local AES-GCM.
408    fn wrap_dek_with_kek(
409        &self,
410        dek: &[u8],
411        kek: &SensitiveBytes,
412        aad: Option<&[u8]>,
413    ) -> Result<Vec<u8>> {
414        let key = SecureKey::try_from(kek.clone())?;
415        let cipher = AesGcmCipher::new(key);
416        cipher.encrypt(dek, aad)
417    }
418
419    /// Unwrap a DEK with a KEK using local AES-GCM.
420    fn unwrap_dek_with_kek(
421        &self,
422        wrapped_dek: &[u8],
423        kek: &SensitiveBytes,
424        aad: Option<&[u8]>,
425    ) -> Result<SensitiveBytes> {
426        let key = SecureKey::try_from(kek.clone())?;
427        let cipher = AesGcmCipher::new(key);
428        cipher.decrypt(wrapped_dek, aad).map(SensitiveBytes::new)
429    }
430}
431
432#[cfg(test)]
433mod tests {
434    use super::*;
435    use crate::encryption::EncryptedInputFile;
436    use crate::encryption::kms::MemoryKeyManagementClient;
437
438    fn create_test_kms() -> Arc<dyn KeyManagementClient> {
439        let kms = MemoryKeyManagementClient::new();
440        kms.add_master_key("master-1").unwrap();
441        Arc::new(kms)
442    }
443
444    fn create_test_manager() -> EncryptionManager {
445        EncryptionManager::builder()
446            .kms_client(create_test_kms())
447            .table_key_id("master-1")
448            .build()
449    }
450
451    #[tokio::test]
452    async fn test_create_kek() {
453        let mgr = create_test_manager();
454        let kek = mgr.create_kek().await.unwrap();
455
456        assert!(!kek.key_id().is_empty());
457        assert!(!kek.encrypted_key_metadata().is_empty());
458        assert_eq!(kek.encrypted_by_id(), Some("master-1"));
459        assert!(kek.properties().contains_key(KEK_CREATED_AT_PROPERTY));
460    }
461
462    fn sample_key_metadata() -> StandardKeyMetadata {
463        StandardKeyMetadata::new(b"0123456789abcdef").with_aad_prefix(b"test-aad-prefix!")
464    }
465
466    #[tokio::test]
467    async fn test_wrap_unwrap_key_metadata_roundtrip() {
468        let mgr = create_test_manager();
469        let plaintext = sample_key_metadata();
470
471        let key_id = mgr
472            .encrypt_manifest_list_key_metadata(&plaintext)
473            .await
474            .unwrap();
475
476        // First wrap should create a new KEK and the wrapped entry — both stored on the manager
477        assert_eq!(mgr.with_encryption_keys(|k| k.len()), 2);
478
479        let decrypted = mgr
480            .decrypt_manifest_list_key_metadata(&key_id)
481            .await
482            .unwrap();
483        assert_eq!(decrypted, plaintext);
484    }
485
486    #[tokio::test]
487    async fn test_kek_reuse_when_not_expired() {
488        let mgr = create_test_manager();
489
490        // First wrap creates a new KEK + wrapped entry (2 keys)
491        let _id1 = mgr
492            .encrypt_manifest_list_key_metadata(&sample_key_metadata())
493            .await
494            .unwrap();
495        let kek_id = mgr.with_encryption_keys(|keys| {
496            assert_eq!(keys.len(), 2);
497            keys.values()
498                .find(|k| k.encrypted_by_id() == Some("master-1"))
499                .unwrap()
500                .key_id()
501                .to_string()
502        });
503
504        // Second wrap should reuse the existing KEK (only adds 1 new wrapped entry)
505        let id2 = mgr
506            .encrypt_manifest_list_key_metadata(&sample_key_metadata())
507            .await
508            .unwrap();
509        let entry2 = mgr.with_encryption_keys(|keys| {
510            assert_eq!(keys.len(), 3);
511            keys.get(&id2).cloned().unwrap()
512        });
513        assert_eq!(entry2.encrypted_by_id(), Some(kek_id.as_str()));
514    }
515
516    #[tokio::test]
517    async fn test_kek_rotation_when_expired() {
518        let kms = create_test_kms();
519
520        // Create a KEK with a timestamp 3 years in the past (exceeds 730-day lifespan)
521        let three_years_ago_ms = Utc::now().timestamp_millis() - (3 * 365 * MILLIS_IN_DAY);
522        let mut properties = HashMap::new();
523        properties.insert(
524            KEK_CREATED_AT_PROPERTY.to_string(),
525            three_years_ago_ms.to_string(),
526        );
527
528        // Wrap a real KEK so unwrap works if needed
529        let kek_key = SecureKey::generate(AesKeySize::Bits128);
530        let wrapped = kms.wrap_key(kek_key.as_bytes(), "master-1").await.unwrap();
531
532        let old_kek = EncryptedKey::builder()
533            .key_id("expired-kek")
534            .encrypted_key_metadata(wrapped)
535            .encrypted_by_id("master-1")
536            .properties(properties)
537            .build();
538
539        // Build manager with the expired KEK
540        let mgr = EncryptionManager::builder()
541            .kms_client(kms)
542            .table_key_id("master-1")
543            .add_encryption_key(old_kek.clone())
544            .build();
545
546        // Wrap should rotate to a new KEK since the existing one is expired
547        let new_entry_id = mgr
548            .encrypt_manifest_list_key_metadata(&sample_key_metadata())
549            .await
550            .unwrap();
551        let entry = mgr
552            .with_encryption_keys(|keys| keys.get(&new_entry_id).cloned())
553            .unwrap();
554        let used_kek_id = entry.encrypted_by_id().unwrap();
555        assert_ne!(used_kek_id, old_kek.key_id());
556    }
557
558    #[tokio::test]
559    async fn test_is_kek_expired_no_timestamp() {
560        let mgr = create_test_manager();
561
562        // KEK without a created-at timestamp -> treated as expired
563        let kek = EncryptedKey::builder()
564            .key_id("no-ts")
565            .encrypted_key_metadata(vec![0u8; 32])
566            .build();
567
568        assert!(mgr.is_kek_expired(&kek));
569    }
570
571    #[tokio::test]
572    async fn test_decrypt_with_unknown_key_id() {
573        let mgr = create_test_manager();
574        let result = mgr.decrypt_manifest_list_key_metadata("nonexistent").await;
575        assert!(result.is_err());
576    }
577
578    #[tokio::test]
579    async fn test_kek_cache_hit() {
580        let mgr = create_test_manager();
581
582        // First wrap caches the plaintext KEK during create_kek().
583        let key_id = mgr
584            .encrypt_manifest_list_key_metadata(&sample_key_metadata())
585            .await
586            .unwrap();
587
588        // Decrypt unwraps the KEK; with the cache populated this should not hit KMS again.
589        let _ = mgr
590            .decrypt_manifest_list_key_metadata(&key_id)
591            .await
592            .unwrap();
593    }
594
595    #[tokio::test]
596    async fn test_unwrap_fails_when_kek_missing_timestamp() {
597        let mgr = create_test_manager();
598
599        // Wrap some metadata to get a valid encrypted entry stored on the manager
600        let entry_id = mgr
601            .encrypt_manifest_list_key_metadata(&sample_key_metadata())
602            .await
603            .unwrap();
604
605        // Find the KEK that wrapped the entry and replace it with a copy that
606        // is missing the KEY_TIMESTAMP property, simulating a malformed table.
607        let mut keys = mgr.with_encryption_keys(|k| k.clone());
608        let kek_id = keys
609            .get(&entry_id)
610            .unwrap()
611            .encrypted_by_id()
612            .unwrap()
613            .to_string();
614        let kek = keys.remove(&kek_id).unwrap();
615        let kek_no_ts = EncryptedKey::builder()
616            .key_id(kek.key_id())
617            .encrypted_key_metadata(kek.encrypted_key_metadata())
618            .encrypted_by_id(kek.encrypted_by_id().unwrap())
619            .build();
620        keys.insert(kek_no_ts.key_id().to_string(), kek_no_ts);
621
622        let mgr = EncryptionManager::builder()
623            .kms_client(create_test_kms())
624            .table_key_id("master-1")
625            .encryption_keys(keys)
626            .build();
627
628        let result = mgr.decrypt_manifest_list_key_metadata(&entry_id).await;
629        assert!(result.is_err());
630        let err = result.unwrap_err();
631        assert_eq!(err.kind(), ErrorKind::DataInvalid);
632        assert!(
633            err.to_string().contains(KEK_CREATED_AT_PROPERTY),
634            "error should mention the missing property: {err}"
635        );
636    }
637
638    #[tokio::test]
639    async fn test_unwrap_fails_when_kek_timestamp_tampered() {
640        let mgr = create_test_manager();
641
642        // Wrap metadata normally
643        let entry_id = mgr
644            .encrypt_manifest_list_key_metadata(&sample_key_metadata())
645            .await
646            .unwrap();
647
648        // Tamper with the KEK timestamp (change the AAD)
649        let mut keys = mgr.with_encryption_keys(|k| k.clone());
650        let kek_id = keys
651            .get(&entry_id)
652            .unwrap()
653            .encrypted_by_id()
654            .unwrap()
655            .to_string();
656        let kek = keys.remove(&kek_id).unwrap();
657        let mut tampered_properties = kek.properties().clone();
658        tampered_properties.insert(KEK_CREATED_AT_PROPERTY.to_string(), "9999999".to_string());
659        let tampered_kek = EncryptedKey::builder()
660            .key_id(kek.key_id())
661            .encrypted_key_metadata(kek.encrypted_key_metadata())
662            .encrypted_by_id(kek.encrypted_by_id().unwrap())
663            .properties(tampered_properties)
664            .build();
665        keys.insert(tampered_kek.key_id().to_string(), tampered_kek);
666
667        let mgr = EncryptionManager::builder()
668            .kms_client(create_test_kms())
669            .table_key_id("master-1")
670            .encryption_keys(keys)
671            .build();
672
673        // Unwrap should fail because the AAD (timestamp) doesn't match what was used to wrap
674        let result = mgr.decrypt_manifest_list_key_metadata(&entry_id).await;
675        assert!(
676            result.is_err(),
677            "tampered timestamp should cause decryption failure"
678        );
679    }
680
681    #[tokio::test]
682    async fn test_encrypt_decrypt_roundtrip() {
683        use crate::io::FileIO;
684
685        let io = FileIO::new_with_memory();
686        let path = "memory:///test/encrypt_roundtrip.bin";
687
688        let kms = MemoryKeyManagementClient::new();
689        kms.add_master_key("master-1").unwrap();
690        let mgr = EncryptionManager::builder()
691            .kms_client(Arc::new(kms) as Arc<dyn KeyManagementClient>)
692            .table_key_id("master-1")
693            .build();
694
695        let output = io.new_output(path).unwrap();
696        let encrypted_output = mgr.encrypt(output);
697
698        let plaintext = b"Hello, encrypted Iceberg round-trip!";
699        let serialized_metadata = encrypted_output.key_metadata().encode().unwrap();
700        encrypted_output
701            .write(bytes::Bytes::from(plaintext.to_vec()))
702            .await
703            .unwrap();
704
705        let input = io.new_input(path).unwrap();
706        let parsed_metadata = StandardKeyMetadata::decode(&serialized_metadata).unwrap();
707        let decrypted_file = EncryptedInputFile::new(input, parsed_metadata);
708
709        let content = decrypted_file.read().await.unwrap();
710        assert_eq!(&content[..], plaintext);
711    }
712}