Skip to main content

iceberg/spec/
table_properties.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::collections::HashMap;
19use std::fmt::Display;
20use std::str::FromStr;
21
22use crate::compression::CompressionCodec;
23use crate::error::{Error, ErrorKind, Result};
24
25fn parse_property<T: FromStr>(
26    properties: &HashMap<String, String>,
27    key: &str,
28    default: T,
29) -> Result<T>
30where
31    <T as FromStr>::Err: Display,
32{
33    properties.get(key).map_or(Ok(default), |value| {
34        value.parse::<T>().map_err(|e| {
35            Error::new(
36                ErrorKind::DataInvalid,
37                format!("Invalid value for {key}: {e}"),
38            )
39        })
40    })
41}
42
43/// Parse compression codec for metadata files from table properties.
44/// Retrieves the compression codec property, applies defaults, and parses the value.
45/// Only "none" (or empty string) and "gzip" are supported for metadata compression.
46///
47/// # Arguments
48///
49/// * `properties` - HashMap containing table properties
50///
51/// # Errors
52///
53/// Returns an error if the codec is not "none", "", or "gzip" (case-insensitive).
54/// Lz4 and Zstd are not supported for metadata file compression.
55pub(crate) fn parse_metadata_file_compression(
56    properties: &HashMap<String, String>,
57) -> Result<CompressionCodec> {
58    let value = properties
59        .get(TableProperties::PROPERTY_METADATA_COMPRESSION_CODEC)
60        .map(|s| s.as_str())
61        .unwrap_or(TableProperties::PROPERTY_METADATA_COMPRESSION_CODEC_DEFAULT);
62
63    // Handle empty string as None
64    if value.is_empty() {
65        return Ok(CompressionCodec::None);
66    }
67
68    // Lowercase the value for case-insensitive parsing
69    let lowercase_value = value.to_lowercase();
70
71    // Use serde to parse the codec (which has rename_all = "lowercase")
72    let codec: CompressionCodec = serde_json::from_value(serde_json::Value::String(
73        lowercase_value,
74    ))
75    .map_err(|_| {
76        Error::new(
77            ErrorKind::DataInvalid,
78            format!(
79                "Invalid metadata compression codec: {value}. Only '{}' and '{}' are supported.",
80                CompressionCodec::None.name(),
81                CompressionCodec::gzip_default().name()
82            ),
83        )
84    })?;
85
86    // Validate that only None and Gzip are used for metadata
87    match codec {
88        CompressionCodec::None | CompressionCodec::Gzip(_) => Ok(codec),
89        _ => Err(Error::new(
90            ErrorKind::DataInvalid,
91            format!(
92                "Invalid metadata compression codec: {value}. Only '{}' and '{}' are supported for metadata files.",
93                CompressionCodec::None.name(),
94                CompressionCodec::gzip_default().name()
95            ),
96        )),
97    }
98}
99
100/// TableProperties that contains the properties of a table.
101#[derive(Debug)]
102pub struct TableProperties {
103    /// The number of times to retry a commit.
104    pub commit_num_retries: usize,
105    /// The minimum wait time between retries.
106    pub commit_min_retry_wait_ms: u64,
107    /// The maximum wait time between retries.
108    pub commit_max_retry_wait_ms: u64,
109    /// The total timeout for commit retries.
110    pub commit_total_retry_timeout_ms: u64,
111    /// The default format for files.
112    pub write_format_default: String,
113    /// The target file size for files.
114    pub write_target_file_size_bytes: usize,
115    /// Compression codec for metadata files (JSON)
116    pub metadata_compression_codec: CompressionCodec,
117    /// Whether to use `FanoutWriter` for partitioned tables.
118    pub write_datafusion_fanout_enabled: bool,
119    /// Whether garbage collection is enabled on drop.
120    /// When `false`, data files will not be deleted when a table is dropped.
121    pub gc_enabled: bool,
122    /// Default maximum age of a snapshot to keep when expiring snapshots.
123    pub max_snapshot_age_ms: i64,
124    /// Default minimum number of snapshots to keep per branch when expiring snapshots.
125    pub min_snapshots_to_keep: usize,
126    /// Default maximum age of a snapshot reference to keep when expiring snapshots.
127    pub max_ref_age_ms: i64,
128    /// Whether content-defined chunking is enabled.
129    /// `true` only when `write.parquet.content-defined-chunking.enabled = "true"`.
130    pub cdc_enabled: bool,
131    /// Content-defined chunking minimum chunk size in bytes.
132    pub cdc_min_chunk_size: usize,
133    /// Content-defined chunking maximum chunk size in bytes.
134    pub cdc_max_chunk_size: usize,
135    /// Content-defined chunking normalization level (gearhash bit adjustment).
136    pub cdc_norm_level: i32,
137    /// The master key id used to encrypt this table's manifest list and data
138    /// files. `None` if `encryption.key-id` is not set.
139    pub encryption_key_id: Option<String>,
140    /// The encryption data encryption key length in bytes.
141    pub encryption_data_key_length: usize,
142}
143
144impl TableProperties {
145    /// Reserved table property for table format version.
146    ///
147    /// Iceberg will default a new table's format version to the latest stable and recommended
148    /// version. This reserved property keyword allows users to override the Iceberg format version of
149    /// the table metadata.
150    ///
151    /// If this table property exists when creating a table, the table will use the specified format
152    /// version. If a table updates this property, it will try to upgrade to the specified format
153    /// version.
154    pub const PROPERTY_FORMAT_VERSION: &str = "format-version";
155    /// Reserved table property for table UUID.
156    pub const PROPERTY_UUID: &str = "uuid";
157    /// Reserved table property for the total number of snapshots.
158    pub const PROPERTY_SNAPSHOT_COUNT: &str = "snapshot-count";
159    /// Reserved table property for current snapshot summary.
160    pub const PROPERTY_CURRENT_SNAPSHOT_SUMMARY: &str = "current-snapshot-summary";
161    /// Reserved table property for current snapshot id.
162    pub const PROPERTY_CURRENT_SNAPSHOT_ID: &str = "current-snapshot-id";
163    /// Reserved table property for current snapshot timestamp.
164    pub const PROPERTY_CURRENT_SNAPSHOT_TIMESTAMP: &str = "current-snapshot-timestamp-ms";
165    /// Reserved table property for the JSON representation of current schema.
166    pub const PROPERTY_CURRENT_SCHEMA: &str = "current-schema";
167    /// Reserved table property for the JSON representation of current(default) partition spec.
168    pub const PROPERTY_DEFAULT_PARTITION_SPEC: &str = "default-partition-spec";
169    /// Reserved table property for the JSON representation of current(default) sort order.
170    pub const PROPERTY_DEFAULT_SORT_ORDER: &str = "default-sort-order";
171
172    /// Property key for max number of previous versions to keep.
173    pub const PROPERTY_METADATA_PREVIOUS_VERSIONS_MAX: &str =
174        "write.metadata.previous-versions-max";
175    /// Default value for max number of previous versions to keep.
176    pub const PROPERTY_METADATA_PREVIOUS_VERSIONS_MAX_DEFAULT: usize = 100;
177
178    /// Property key for max number of partitions to keep summary stats for.
179    pub const PROPERTY_WRITE_PARTITION_SUMMARY_LIMIT: &str = "write.summary.partition-limit";
180    /// Default value for the max number of partitions to keep summary stats for.
181    pub const PROPERTY_WRITE_PARTITION_SUMMARY_LIMIT_DEFAULT: u64 = 0;
182
183    /// Reserved Iceberg table properties list.
184    ///
185    /// Reserved table properties are only used to control behaviors when creating or updating a
186    /// table. The value of these properties are not persisted as a part of the table metadata.
187    pub const RESERVED_PROPERTIES: [&str; 9] = [
188        Self::PROPERTY_FORMAT_VERSION,
189        Self::PROPERTY_UUID,
190        Self::PROPERTY_SNAPSHOT_COUNT,
191        Self::PROPERTY_CURRENT_SNAPSHOT_ID,
192        Self::PROPERTY_CURRENT_SNAPSHOT_SUMMARY,
193        Self::PROPERTY_CURRENT_SNAPSHOT_TIMESTAMP,
194        Self::PROPERTY_CURRENT_SCHEMA,
195        Self::PROPERTY_DEFAULT_PARTITION_SPEC,
196        Self::PROPERTY_DEFAULT_SORT_ORDER,
197    ];
198
199    /// Property key for number of commit retries.
200    pub const PROPERTY_COMMIT_NUM_RETRIES: &str = "commit.retry.num-retries";
201    /// Default value for number of commit retries.
202    pub const PROPERTY_COMMIT_NUM_RETRIES_DEFAULT: usize = 4;
203
204    /// Property key for minimum wait time (ms) between retries.
205    pub const PROPERTY_COMMIT_MIN_RETRY_WAIT_MS: &str = "commit.retry.min-wait-ms";
206    /// Default value for minimum wait time (ms) between retries.
207    pub const PROPERTY_COMMIT_MIN_RETRY_WAIT_MS_DEFAULT: u64 = 100;
208
209    /// Property key for maximum wait time (ms) between retries.
210    pub const PROPERTY_COMMIT_MAX_RETRY_WAIT_MS: &str = "commit.retry.max-wait-ms";
211    /// Default value for maximum wait time (ms) between retries.
212    pub const PROPERTY_COMMIT_MAX_RETRY_WAIT_MS_DEFAULT: u64 = 60 * 1000; // 1 minute
213
214    /// Property key for total maximum retry time (ms).
215    pub const PROPERTY_COMMIT_TOTAL_RETRY_TIME_MS: &str = "commit.retry.total-timeout-ms";
216    /// Default value for total maximum retry time (ms).
217    pub const PROPERTY_COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT: u64 = 30 * 60 * 1000; // 30 minutes
218
219    /// Default file format for data files
220    pub const PROPERTY_DEFAULT_FILE_FORMAT: &str = "write.format.default";
221    /// Default file format for delete files
222    pub const PROPERTY_DELETE_DEFAULT_FILE_FORMAT: &str = "write.delete.format.default";
223    /// Default value for data file format
224    pub const PROPERTY_DEFAULT_FILE_FORMAT_DEFAULT: &str = "parquet";
225
226    /// Target file size for newly written files.
227    pub const PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES: &str = "write.target-file-size-bytes";
228    /// Default target file size
229    pub const PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT: usize = 512 * 1024 * 1024; // 512 MB
230
231    /// Compression codec for metadata files (JSON)
232    pub const PROPERTY_METADATA_COMPRESSION_CODEC: &str = "write.metadata.compression-codec";
233    /// Default metadata compression codec - uncompressed
234    pub const PROPERTY_METADATA_COMPRESSION_CODEC_DEFAULT: &str = "none";
235    /// Whether to use `FanoutWriter` for partitioned tables (handles unsorted data).
236    /// If false, uses `ClusteredWriter` (requires sorted data, more memory efficient).
237    pub const PROPERTY_DATAFUSION_WRITE_FANOUT_ENABLED: &str = "write.datafusion.fanout.enabled";
238    /// Default value for fanout writer enabled
239    pub const PROPERTY_DATAFUSION_WRITE_FANOUT_ENABLED_DEFAULT: bool = true;
240
241    /// Property key for enabling garbage collection on drop.
242    /// When set to `false`, data files will not be deleted when a table is dropped.
243    /// Defaults to `true`.
244    pub const PROPERTY_GC_ENABLED: &str = "gc.enabled";
245    /// Default value for gc.enabled
246    pub const PROPERTY_GC_ENABLED_DEFAULT: bool = true;
247
248    /// Property key for the default maximum age of a snapshot to keep when expiring snapshots.
249    pub const PROPERTY_MAX_SNAPSHOT_AGE_MS: &str = "history.expire.max-snapshot-age-ms";
250    /// Default value for history.expire.max-snapshot-age-ms (5 days).
251    pub const PROPERTY_MAX_SNAPSHOT_AGE_MS_DEFAULT: i64 = 5 * 24 * 60 * 60 * 1000;
252    /// Property key for the default minimum number of snapshots to keep when expiring snapshots.
253    pub const PROPERTY_MIN_SNAPSHOTS_TO_KEEP: &str = "history.expire.min-snapshots-to-keep";
254    /// Default value for history.expire.min-snapshots-to-keep.
255    pub const PROPERTY_MIN_SNAPSHOTS_TO_KEEP_DEFAULT: usize = 1;
256    /// Property key for the default maximum age of a snapshot reference to keep when expiring.
257    pub const PROPERTY_MAX_REF_AGE_MS: &str = "history.expire.max-ref-age-ms";
258    /// Default value for history.expire.max-ref-age-ms (effectively never expire refs).
259    pub const PROPERTY_MAX_REF_AGE_MS_DEFAULT: i64 = i64::MAX;
260
261    /// Enable content-defined chunking with parquet defaults (or per-property overrides).
262    pub const PROPERTY_PARQUET_CDC_ENABLED: &str = "write.parquet.content-defined-chunking.enabled";
263    /// Default value for content-defined chunking enabled.
264    pub const PROPERTY_PARQUET_CDC_ENABLED_DEFAULT: bool = false;
265    /// Minimum chunk size in bytes for content-defined chunking.
266    pub const PROPERTY_PARQUET_CDC_MIN_CHUNK_SIZE: &str =
267        "write.parquet.content-defined-chunking.min-chunk-size";
268    /// Default matches `parquet::file::properties::DEFAULT_CDC_MIN_CHUNK_SIZE`.
269    pub const PROPERTY_PARQUET_CDC_MIN_CHUNK_SIZE_DEFAULT: usize = 256 * 1024;
270    /// Maximum chunk size in bytes for content-defined chunking.
271    pub const PROPERTY_PARQUET_CDC_MAX_CHUNK_SIZE: &str =
272        "write.parquet.content-defined-chunking.max-chunk-size";
273    /// Default matches `parquet::file::properties::DEFAULT_CDC_MAX_CHUNK_SIZE`.
274    pub const PROPERTY_PARQUET_CDC_MAX_CHUNK_SIZE_DEFAULT: usize = 1024 * 1024;
275    /// Normalization level (gearhash bit adjustment) for content-defined chunking.
276    pub const PROPERTY_PARQUET_CDC_NORM_LEVEL: &str =
277        "write.parquet.content-defined-chunking.norm-level";
278    /// Default matches `parquet::file::properties::DEFAULT_CDC_NORM_LEVEL`.
279    pub const PROPERTY_PARQUET_CDC_NORM_LEVEL_DEFAULT: i32 = 0;
280
281    /// Property key for the master key id used to encrypt the table's manifest
282    /// list and data files as defined in https://iceberg.apache.org/docs/nightly/encryption/.
283    pub const PROPERTY_ENCRYPTION_KEY_ID: &str = "encryption.key-id";
284
285    /// Property key for the encryption data encryption key (DEK) length in bytes.
286    pub const PROPERTY_ENCRYPTION_DATA_KEY_LENGTH: &str = "encryption.data-key-length";
287    /// Default value for the encryption DEK length (16 bytes = AES-128).
288    pub const PROPERTY_ENCRYPTION_DATA_KEY_LENGTH_DEFAULT: usize = 16;
289}
290
291impl TryFrom<&HashMap<String, String>> for TableProperties {
292    // parse by entry key or use default value
293    type Error = Error;
294
295    fn try_from(props: &HashMap<String, String>) -> Result<Self> {
296        Ok(TableProperties {
297            commit_num_retries: parse_property(
298                props,
299                TableProperties::PROPERTY_COMMIT_NUM_RETRIES,
300                TableProperties::PROPERTY_COMMIT_NUM_RETRIES_DEFAULT,
301            )?,
302            commit_min_retry_wait_ms: parse_property(
303                props,
304                TableProperties::PROPERTY_COMMIT_MIN_RETRY_WAIT_MS,
305                TableProperties::PROPERTY_COMMIT_MIN_RETRY_WAIT_MS_DEFAULT,
306            )?,
307            commit_max_retry_wait_ms: parse_property(
308                props,
309                TableProperties::PROPERTY_COMMIT_MAX_RETRY_WAIT_MS,
310                TableProperties::PROPERTY_COMMIT_MAX_RETRY_WAIT_MS_DEFAULT,
311            )?,
312            commit_total_retry_timeout_ms: parse_property(
313                props,
314                TableProperties::PROPERTY_COMMIT_TOTAL_RETRY_TIME_MS,
315                TableProperties::PROPERTY_COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT,
316            )?,
317            write_format_default: parse_property(
318                props,
319                TableProperties::PROPERTY_DEFAULT_FILE_FORMAT,
320                TableProperties::PROPERTY_DEFAULT_FILE_FORMAT_DEFAULT.to_string(),
321            )?,
322            write_target_file_size_bytes: parse_property(
323                props,
324                TableProperties::PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES,
325                TableProperties::PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT,
326            )?,
327            metadata_compression_codec: parse_metadata_file_compression(props)?,
328            write_datafusion_fanout_enabled: parse_property(
329                props,
330                TableProperties::PROPERTY_DATAFUSION_WRITE_FANOUT_ENABLED,
331                TableProperties::PROPERTY_DATAFUSION_WRITE_FANOUT_ENABLED_DEFAULT,
332            )?,
333            gc_enabled: parse_property(
334                props,
335                TableProperties::PROPERTY_GC_ENABLED,
336                TableProperties::PROPERTY_GC_ENABLED_DEFAULT,
337            )?,
338            max_snapshot_age_ms: parse_property(
339                props,
340                TableProperties::PROPERTY_MAX_SNAPSHOT_AGE_MS,
341                TableProperties::PROPERTY_MAX_SNAPSHOT_AGE_MS_DEFAULT,
342            )?,
343            min_snapshots_to_keep: parse_property(
344                props,
345                TableProperties::PROPERTY_MIN_SNAPSHOTS_TO_KEEP,
346                TableProperties::PROPERTY_MIN_SNAPSHOTS_TO_KEEP_DEFAULT,
347            )?,
348            max_ref_age_ms: parse_property(
349                props,
350                TableProperties::PROPERTY_MAX_REF_AGE_MS,
351                TableProperties::PROPERTY_MAX_REF_AGE_MS_DEFAULT,
352            )?,
353            cdc_enabled: parse_property(
354                props,
355                TableProperties::PROPERTY_PARQUET_CDC_ENABLED,
356                TableProperties::PROPERTY_PARQUET_CDC_ENABLED_DEFAULT,
357            )?,
358            cdc_min_chunk_size: parse_property(
359                props,
360                TableProperties::PROPERTY_PARQUET_CDC_MIN_CHUNK_SIZE,
361                TableProperties::PROPERTY_PARQUET_CDC_MIN_CHUNK_SIZE_DEFAULT,
362            )?,
363            cdc_max_chunk_size: parse_property(
364                props,
365                TableProperties::PROPERTY_PARQUET_CDC_MAX_CHUNK_SIZE,
366                TableProperties::PROPERTY_PARQUET_CDC_MAX_CHUNK_SIZE_DEFAULT,
367            )?,
368            cdc_norm_level: parse_property(
369                props,
370                TableProperties::PROPERTY_PARQUET_CDC_NORM_LEVEL,
371                TableProperties::PROPERTY_PARQUET_CDC_NORM_LEVEL_DEFAULT,
372            )?,
373            encryption_key_id: props
374                .get(TableProperties::PROPERTY_ENCRYPTION_KEY_ID)
375                .cloned(),
376            encryption_data_key_length: parse_property(
377                props,
378                TableProperties::PROPERTY_ENCRYPTION_DATA_KEY_LENGTH,
379                TableProperties::PROPERTY_ENCRYPTION_DATA_KEY_LENGTH_DEFAULT,
380            )?,
381        })
382    }
383}
384
385#[cfg(test)]
386mod tests {
387    use super::*;
388    use crate::compression::CompressionCodec;
389
390    #[test]
391    fn test_table_properties_default() {
392        let props = HashMap::new();
393        let table_properties = TableProperties::try_from(&props).unwrap();
394        assert_eq!(
395            table_properties.commit_num_retries,
396            TableProperties::PROPERTY_COMMIT_NUM_RETRIES_DEFAULT
397        );
398        assert_eq!(
399            table_properties.commit_min_retry_wait_ms,
400            TableProperties::PROPERTY_COMMIT_MIN_RETRY_WAIT_MS_DEFAULT
401        );
402        assert_eq!(
403            table_properties.commit_max_retry_wait_ms,
404            TableProperties::PROPERTY_COMMIT_MAX_RETRY_WAIT_MS_DEFAULT
405        );
406        assert_eq!(
407            table_properties.write_format_default,
408            TableProperties::PROPERTY_DEFAULT_FILE_FORMAT_DEFAULT.to_string()
409        );
410        assert_eq!(
411            table_properties.write_target_file_size_bytes,
412            TableProperties::PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT
413        );
414        // Test compression defaults (none means CompressionCodec::None)
415        assert_eq!(
416            table_properties.metadata_compression_codec,
417            CompressionCodec::None
418        );
419        assert_eq!(
420            table_properties.gc_enabled,
421            TableProperties::PROPERTY_GC_ENABLED_DEFAULT
422        );
423        assert_eq!(
424            table_properties.max_snapshot_age_ms,
425            TableProperties::PROPERTY_MAX_SNAPSHOT_AGE_MS_DEFAULT
426        );
427        assert_eq!(
428            table_properties.min_snapshots_to_keep,
429            TableProperties::PROPERTY_MIN_SNAPSHOTS_TO_KEEP_DEFAULT
430        );
431        assert_eq!(
432            table_properties.max_ref_age_ms,
433            TableProperties::PROPERTY_MAX_REF_AGE_MS_DEFAULT
434        );
435    }
436
437    #[test]
438    fn test_table_properties_history_expire_overrides() {
439        let props = HashMap::from([
440            (
441                TableProperties::PROPERTY_MAX_SNAPSHOT_AGE_MS.to_string(),
442                "1234".to_string(),
443            ),
444            (
445                TableProperties::PROPERTY_MIN_SNAPSHOTS_TO_KEEP.to_string(),
446                "7".to_string(),
447            ),
448            (
449                TableProperties::PROPERTY_MAX_REF_AGE_MS.to_string(),
450                "5678".to_string(),
451            ),
452        ]);
453        let table_properties = TableProperties::try_from(&props).unwrap();
454        assert_eq!(table_properties.max_snapshot_age_ms, 1234);
455        assert_eq!(table_properties.min_snapshots_to_keep, 7);
456        assert_eq!(table_properties.max_ref_age_ms, 5678);
457    }
458
459    #[test]
460    fn test_table_properties_compression() {
461        let props = HashMap::from([(
462            TableProperties::PROPERTY_METADATA_COMPRESSION_CODEC.to_string(),
463            "gzip".to_string(),
464        )]);
465        let table_properties = TableProperties::try_from(&props).unwrap();
466        assert_eq!(
467            table_properties.metadata_compression_codec,
468            CompressionCodec::gzip_default()
469        );
470    }
471
472    #[test]
473    fn test_table_properties_compression_none() {
474        let props = HashMap::from([(
475            TableProperties::PROPERTY_METADATA_COMPRESSION_CODEC.to_string(),
476            "none".to_string(),
477        )]);
478        let table_properties = TableProperties::try_from(&props).unwrap();
479        assert_eq!(
480            table_properties.metadata_compression_codec,
481            CompressionCodec::None
482        );
483    }
484
485    #[test]
486    fn test_table_properties_compression_case_insensitive() {
487        // Test uppercase
488        let props_upper = HashMap::from([(
489            TableProperties::PROPERTY_METADATA_COMPRESSION_CODEC.to_string(),
490            "GZIP".to_string(),
491        )]);
492        let table_properties = TableProperties::try_from(&props_upper).unwrap();
493        assert_eq!(
494            table_properties.metadata_compression_codec,
495            CompressionCodec::gzip_default()
496        );
497
498        // Test mixed case
499        let props_mixed = HashMap::from([(
500            TableProperties::PROPERTY_METADATA_COMPRESSION_CODEC.to_string(),
501            "GzIp".to_string(),
502        )]);
503        let table_properties = TableProperties::try_from(&props_mixed).unwrap();
504        assert_eq!(
505            table_properties.metadata_compression_codec,
506            CompressionCodec::gzip_default()
507        );
508
509        // Test "NONE" should also be case-insensitive
510        let props_none_upper = HashMap::from([(
511            TableProperties::PROPERTY_METADATA_COMPRESSION_CODEC.to_string(),
512            "NONE".to_string(),
513        )]);
514        let table_properties = TableProperties::try_from(&props_none_upper).unwrap();
515        assert_eq!(
516            table_properties.metadata_compression_codec,
517            CompressionCodec::None
518        );
519    }
520
521    #[test]
522    fn test_table_properties_valid() {
523        let props = HashMap::from([
524            (
525                TableProperties::PROPERTY_COMMIT_NUM_RETRIES.to_string(),
526                "10".to_string(),
527            ),
528            (
529                TableProperties::PROPERTY_COMMIT_MAX_RETRY_WAIT_MS.to_string(),
530                "20".to_string(),
531            ),
532            (
533                TableProperties::PROPERTY_DEFAULT_FILE_FORMAT.to_string(),
534                "avro".to_string(),
535            ),
536            (
537                TableProperties::PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES.to_string(),
538                "512".to_string(),
539            ),
540            (
541                TableProperties::PROPERTY_GC_ENABLED.to_string(),
542                "false".to_string(),
543            ),
544        ]);
545        let table_properties = TableProperties::try_from(&props).unwrap();
546        assert_eq!(table_properties.commit_num_retries, 10);
547        assert_eq!(table_properties.commit_max_retry_wait_ms, 20);
548        assert_eq!(table_properties.write_format_default, "avro".to_string());
549        assert_eq!(table_properties.write_target_file_size_bytes, 512);
550        assert!(!table_properties.gc_enabled);
551    }
552
553    #[test]
554    fn test_table_properties_invalid() {
555        let invalid_retries = HashMap::from([(
556            TableProperties::PROPERTY_COMMIT_NUM_RETRIES.to_string(),
557            "abc".to_string(),
558        )]);
559
560        let table_properties = TableProperties::try_from(&invalid_retries).unwrap_err();
561        assert!(
562            table_properties.to_string().contains(
563                "Invalid value for commit.retry.num-retries: invalid digit found in string"
564            )
565        );
566
567        let invalid_min_wait = HashMap::from([(
568            TableProperties::PROPERTY_COMMIT_MIN_RETRY_WAIT_MS.to_string(),
569            "abc".to_string(),
570        )]);
571        let table_properties = TableProperties::try_from(&invalid_min_wait).unwrap_err();
572        assert!(
573            table_properties.to_string().contains(
574                "Invalid value for commit.retry.min-wait-ms: invalid digit found in string"
575            )
576        );
577
578        let invalid_max_wait = HashMap::from([(
579            TableProperties::PROPERTY_COMMIT_MAX_RETRY_WAIT_MS.to_string(),
580            "abc".to_string(),
581        )]);
582        let table_properties = TableProperties::try_from(&invalid_max_wait).unwrap_err();
583        assert!(
584            table_properties.to_string().contains(
585                "Invalid value for commit.retry.max-wait-ms: invalid digit found in string"
586            )
587        );
588
589        let invalid_target_size = HashMap::from([(
590            TableProperties::PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES.to_string(),
591            "abc".to_string(),
592        )]);
593        let table_properties = TableProperties::try_from(&invalid_target_size).unwrap_err();
594        assert!(table_properties.to_string().contains(
595            "Invalid value for write.target-file-size-bytes: invalid digit found in string"
596        ));
597
598        let invalid_gc_enabled = HashMap::from([(
599            TableProperties::PROPERTY_GC_ENABLED.to_string(),
600            "notabool".to_string(),
601        )]);
602        let table_properties = TableProperties::try_from(&invalid_gc_enabled).unwrap_err();
603        assert!(
604            table_properties
605                .to_string()
606                .contains("Invalid value for gc.enabled")
607        );
608    }
609
610    #[test]
611    fn test_table_properties_compression_invalid_rejected() {
612        let invalid_codecs = ["lz4", "zstd", "snappy"];
613
614        for codec in invalid_codecs {
615            let props = HashMap::from([(
616                TableProperties::PROPERTY_METADATA_COMPRESSION_CODEC.to_string(),
617                codec.to_string(),
618            )]);
619            let err = TableProperties::try_from(&props).unwrap_err();
620            let err_msg = err.to_string();
621            assert!(
622                err_msg.contains(&format!("Invalid metadata compression codec: {codec}")),
623                "Expected error message to contain codec '{codec}', got: {err_msg}"
624            );
625            assert!(
626                err_msg.contains("Only 'none' and 'gzip' are supported"),
627                "Expected error message to contain supported codecs, got: {err_msg}"
628            );
629        }
630    }
631
632    #[test]
633    fn test_parse_metadata_file_compression_valid() {
634        // Test with "none"
635        let props = HashMap::from([(
636            TableProperties::PROPERTY_METADATA_COMPRESSION_CODEC.to_string(),
637            "none".to_string(),
638        )]);
639        assert_eq!(
640            parse_metadata_file_compression(&props).unwrap(),
641            CompressionCodec::None
642        );
643
644        // Test with empty string
645        let props = HashMap::from([(
646            TableProperties::PROPERTY_METADATA_COMPRESSION_CODEC.to_string(),
647            "".to_string(),
648        )]);
649        assert_eq!(
650            parse_metadata_file_compression(&props).unwrap(),
651            CompressionCodec::None
652        );
653
654        // Test with "gzip"
655        let props = HashMap::from([(
656            TableProperties::PROPERTY_METADATA_COMPRESSION_CODEC.to_string(),
657            "gzip".to_string(),
658        )]);
659        assert_eq!(
660            parse_metadata_file_compression(&props).unwrap(),
661            CompressionCodec::gzip_default()
662        );
663
664        // Test case insensitivity - "NONE"
665        let props = HashMap::from([(
666            TableProperties::PROPERTY_METADATA_COMPRESSION_CODEC.to_string(),
667            "NONE".to_string(),
668        )]);
669        assert_eq!(
670            parse_metadata_file_compression(&props).unwrap(),
671            CompressionCodec::None
672        );
673
674        // Test case insensitivity - "GZIP"
675        let props = HashMap::from([(
676            TableProperties::PROPERTY_METADATA_COMPRESSION_CODEC.to_string(),
677            "GZIP".to_string(),
678        )]);
679        assert_eq!(
680            parse_metadata_file_compression(&props).unwrap(),
681            CompressionCodec::gzip_default()
682        );
683
684        // Test case insensitivity - "GzIp"
685        let props = HashMap::from([(
686            TableProperties::PROPERTY_METADATA_COMPRESSION_CODEC.to_string(),
687            "GzIp".to_string(),
688        )]);
689        assert_eq!(
690            parse_metadata_file_compression(&props).unwrap(),
691            CompressionCodec::gzip_default()
692        );
693
694        // Test default when property is missing
695        let props = HashMap::new();
696        assert_eq!(
697            parse_metadata_file_compression(&props).unwrap(),
698            CompressionCodec::None
699        );
700    }
701
702    #[test]
703    fn test_parse_metadata_file_compression_invalid() {
704        let invalid_codecs = ["lz4", "zstd", "snappy"];
705
706        for codec in invalid_codecs {
707            let props = HashMap::from([(
708                TableProperties::PROPERTY_METADATA_COMPRESSION_CODEC.to_string(),
709                codec.to_string(),
710            )]);
711            let err = parse_metadata_file_compression(&props).unwrap_err();
712            let err_msg = err.to_string();
713            assert!(
714                err_msg.contains("Invalid metadata compression codec"),
715                "Expected error message to contain 'Invalid metadata compression codec', got: {err_msg}"
716            );
717            assert!(
718                err_msg.contains("Only 'none' and 'gzip' are supported"),
719                "Expected error message to contain supported codecs, got: {err_msg}"
720            );
721        }
722    }
723
724    #[test]
725    fn test_cdc_disabled_by_default() {
726        let props = HashMap::new();
727        let tp = TableProperties::try_from(&props).unwrap();
728        assert!(!tp.cdc_enabled);
729    }
730
731    #[test]
732    fn test_cdc_enabled_via_flag() {
733        let props = HashMap::from([(
734            TableProperties::PROPERTY_PARQUET_CDC_ENABLED.to_string(),
735            "true".to_string(),
736        )]);
737        let tp = TableProperties::try_from(&props).unwrap();
738        assert!(tp.cdc_enabled);
739        assert_eq!(tp.cdc_min_chunk_size, 256 * 1024);
740        assert_eq!(tp.cdc_max_chunk_size, 1024 * 1024);
741        assert_eq!(tp.cdc_norm_level, 0);
742    }
743
744    #[test]
745    fn test_cdc_size_props_alone_do_not_enable() {
746        let props = HashMap::from([(
747            TableProperties::PROPERTY_PARQUET_CDC_MIN_CHUNK_SIZE.to_string(),
748            "262144".to_string(),
749        )]);
750        let tp = TableProperties::try_from(&props).unwrap();
751        assert!(!tp.cdc_enabled);
752    }
753
754    #[test]
755    fn test_cdc_custom_values() {
756        let props = HashMap::from([
757            (
758                TableProperties::PROPERTY_PARQUET_CDC_ENABLED.to_string(),
759                "true".to_string(),
760            ),
761            (
762                TableProperties::PROPERTY_PARQUET_CDC_MIN_CHUNK_SIZE.to_string(),
763                "200000".to_string(),
764            ),
765            (
766                TableProperties::PROPERTY_PARQUET_CDC_MAX_CHUNK_SIZE.to_string(),
767                "900000".to_string(),
768            ),
769            (
770                TableProperties::PROPERTY_PARQUET_CDC_NORM_LEVEL.to_string(),
771                "1".to_string(),
772            ),
773        ]);
774        let tp = TableProperties::try_from(&props).unwrap();
775        assert!(tp.cdc_enabled);
776        assert_eq!(tp.cdc_min_chunk_size, 200000);
777        assert_eq!(tp.cdc_max_chunk_size, 900000);
778        assert_eq!(tp.cdc_norm_level, 1);
779    }
780
781    #[test]
782    fn test_cdc_partial_override() {
783        let props = HashMap::from([
784            (
785                TableProperties::PROPERTY_PARQUET_CDC_ENABLED.to_string(),
786                "true".to_string(),
787            ),
788            (
789                TableProperties::PROPERTY_PARQUET_CDC_NORM_LEVEL.to_string(),
790                "2".to_string(),
791            ),
792        ]);
793        let tp = TableProperties::try_from(&props).unwrap();
794        assert!(tp.cdc_enabled);
795        assert_eq!(tp.cdc_min_chunk_size, 256 * 1024);
796        assert_eq!(tp.cdc_max_chunk_size, 1024 * 1024);
797        assert_eq!(tp.cdc_norm_level, 2);
798    }
799
800    #[test]
801    fn test_cdc_negative_norm_level() {
802        let props = HashMap::from([
803            (
804                TableProperties::PROPERTY_PARQUET_CDC_ENABLED.to_string(),
805                "true".to_string(),
806            ),
807            (
808                TableProperties::PROPERTY_PARQUET_CDC_NORM_LEVEL.to_string(),
809                "-2".to_string(),
810            ),
811        ]);
812        let tp = TableProperties::try_from(&props).unwrap();
813        assert_eq!(tp.cdc_norm_level, -2);
814    }
815
816    #[test]
817    fn test_cdc_invalid_min_chunk_size() {
818        let props = HashMap::from([
819            (
820                TableProperties::PROPERTY_PARQUET_CDC_ENABLED.to_string(),
821                "true".to_string(),
822            ),
823            (
824                TableProperties::PROPERTY_PARQUET_CDC_MIN_CHUNK_SIZE.to_string(),
825                "not_a_number".to_string(),
826            ),
827        ]);
828        let err = TableProperties::try_from(&props).unwrap_err();
829        assert!(
830            err.to_string().contains(
831                "Invalid value for write.parquet.content-defined-chunking.min-chunk-size"
832            )
833        );
834    }
835
836    #[test]
837    fn test_cdc_invalid_norm_level() {
838        let props = HashMap::from([
839            (
840                TableProperties::PROPERTY_PARQUET_CDC_ENABLED.to_string(),
841                "true".to_string(),
842            ),
843            (
844                TableProperties::PROPERTY_PARQUET_CDC_NORM_LEVEL.to_string(),
845                "not_a_number".to_string(),
846            ),
847        ]);
848        let err = TableProperties::try_from(&props).unwrap_err();
849        assert!(
850            err.to_string()
851                .contains("Invalid value for write.parquet.content-defined-chunking.norm-level")
852        );
853    }
854
855    #[test]
856    fn test_cdc_no_properties() {
857        let props = HashMap::from([("some.other.property".to_string(), "value".to_string())]);
858        let tp = TableProperties::try_from(&props).unwrap();
859        assert!(!tp.cdc_enabled);
860    }
861}