iceberg/spec/
table_properties.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::collections::HashMap;
19use std::fmt::Display;
20use std::str::FromStr;
21
22use crate::compression::CompressionCodec;
23use crate::error::{Error, ErrorKind, Result};
24
25fn parse_property<T: FromStr>(
26    properties: &HashMap<String, String>,
27    key: &str,
28    default: T,
29) -> Result<T>
30where
31    <T as FromStr>::Err: Display,
32{
33    properties.get(key).map_or(Ok(default), |value| {
34        value.parse::<T>().map_err(|e| {
35            Error::new(
36                ErrorKind::DataInvalid,
37                format!("Invalid value for {key}: {e}"),
38            )
39        })
40    })
41}
42
43/// Parse compression codec for metadata files from table properties.
44/// Retrieves the compression codec property, applies defaults, and parses the value.
45/// Only "none" (or empty string) and "gzip" are supported for metadata compression.
46///
47/// # Arguments
48///
49/// * `properties` - HashMap containing table properties
50///
51/// # Errors
52///
53/// Returns an error if the codec is not "none", "", or "gzip" (case-insensitive).
54/// Lz4 and Zstd are not supported for metadata file compression.
55pub(crate) fn parse_metadata_file_compression(
56    properties: &HashMap<String, String>,
57) -> Result<CompressionCodec> {
58    let value = properties
59        .get(TableProperties::PROPERTY_METADATA_COMPRESSION_CODEC)
60        .map(|s| s.as_str())
61        .unwrap_or(TableProperties::PROPERTY_METADATA_COMPRESSION_CODEC_DEFAULT);
62
63    // Handle empty string as None
64    if value.is_empty() {
65        return Ok(CompressionCodec::None);
66    }
67
68    // Lowercase the value for case-insensitive parsing
69    let lowercase_value = value.to_lowercase();
70
71    // Use serde to parse the codec (which has rename_all = "lowercase")
72    let codec: CompressionCodec = serde_json::from_value(serde_json::Value::String(
73        lowercase_value,
74    ))
75    .map_err(|_| {
76        Error::new(
77            ErrorKind::DataInvalid,
78            format!(
79                "Invalid metadata compression codec: {value}. Only '{}' and '{}' are supported.",
80                CompressionCodec::None.name(),
81                CompressionCodec::gzip_default().name()
82            ),
83        )
84    })?;
85
86    // Validate that only None and Gzip are used for metadata
87    match codec {
88        CompressionCodec::None | CompressionCodec::Gzip(_) => Ok(codec),
89        _ => Err(Error::new(
90            ErrorKind::DataInvalid,
91            format!(
92                "Invalid metadata compression codec: {value}. Only '{}' and '{}' are supported for metadata files.",
93                CompressionCodec::None.name(),
94                CompressionCodec::gzip_default().name()
95            ),
96        )),
97    }
98}
99
100/// TableProperties that contains the properties of a table.
101#[derive(Debug)]
102pub struct TableProperties {
103    /// The number of times to retry a commit.
104    pub commit_num_retries: usize,
105    /// The minimum wait time between retries.
106    pub commit_min_retry_wait_ms: u64,
107    /// The maximum wait time between retries.
108    pub commit_max_retry_wait_ms: u64,
109    /// The total timeout for commit retries.
110    pub commit_total_retry_timeout_ms: u64,
111    /// The default format for files.
112    pub write_format_default: String,
113    /// The target file size for files.
114    pub write_target_file_size_bytes: usize,
115    /// Compression codec for metadata files (JSON)
116    pub metadata_compression_codec: CompressionCodec,
117    /// Whether to use `FanoutWriter` for partitioned tables.
118    pub write_datafusion_fanout_enabled: bool,
119    /// Whether garbage collection is enabled on drop.
120    /// When `false`, data files will not be deleted when a table is dropped.
121    pub gc_enabled: bool,
122    /// Whether content-defined chunking is enabled.
123    /// `true` only when `write.parquet.content-defined-chunking.enabled = "true"`.
124    pub cdc_enabled: bool,
125    /// Content-defined chunking minimum chunk size in bytes.
126    pub cdc_min_chunk_size: usize,
127    /// Content-defined chunking maximum chunk size in bytes.
128    pub cdc_max_chunk_size: usize,
129    /// Content-defined chunking normalization level (gearhash bit adjustment).
130    pub cdc_norm_level: i32,
131}
132
133impl TableProperties {
134    /// Reserved table property for table format version.
135    ///
136    /// Iceberg will default a new table's format version to the latest stable and recommended
137    /// version. This reserved property keyword allows users to override the Iceberg format version of
138    /// the table metadata.
139    ///
140    /// If this table property exists when creating a table, the table will use the specified format
141    /// version. If a table updates this property, it will try to upgrade to the specified format
142    /// version.
143    pub const PROPERTY_FORMAT_VERSION: &str = "format-version";
144    /// Reserved table property for table UUID.
145    pub const PROPERTY_UUID: &str = "uuid";
146    /// Reserved table property for the total number of snapshots.
147    pub const PROPERTY_SNAPSHOT_COUNT: &str = "snapshot-count";
148    /// Reserved table property for current snapshot summary.
149    pub const PROPERTY_CURRENT_SNAPSHOT_SUMMARY: &str = "current-snapshot-summary";
150    /// Reserved table property for current snapshot id.
151    pub const PROPERTY_CURRENT_SNAPSHOT_ID: &str = "current-snapshot-id";
152    /// Reserved table property for current snapshot timestamp.
153    pub const PROPERTY_CURRENT_SNAPSHOT_TIMESTAMP: &str = "current-snapshot-timestamp-ms";
154    /// Reserved table property for the JSON representation of current schema.
155    pub const PROPERTY_CURRENT_SCHEMA: &str = "current-schema";
156    /// Reserved table property for the JSON representation of current(default) partition spec.
157    pub const PROPERTY_DEFAULT_PARTITION_SPEC: &str = "default-partition-spec";
158    /// Reserved table property for the JSON representation of current(default) sort order.
159    pub const PROPERTY_DEFAULT_SORT_ORDER: &str = "default-sort-order";
160
161    /// Property key for max number of previous versions to keep.
162    pub const PROPERTY_METADATA_PREVIOUS_VERSIONS_MAX: &str =
163        "write.metadata.previous-versions-max";
164    /// Default value for max number of previous versions to keep.
165    pub const PROPERTY_METADATA_PREVIOUS_VERSIONS_MAX_DEFAULT: usize = 100;
166
167    /// Property key for max number of partitions to keep summary stats for.
168    pub const PROPERTY_WRITE_PARTITION_SUMMARY_LIMIT: &str = "write.summary.partition-limit";
169    /// Default value for the max number of partitions to keep summary stats for.
170    pub const PROPERTY_WRITE_PARTITION_SUMMARY_LIMIT_DEFAULT: u64 = 0;
171
172    /// Reserved Iceberg table properties list.
173    ///
174    /// Reserved table properties are only used to control behaviors when creating or updating a
175    /// table. The value of these properties are not persisted as a part of the table metadata.
176    pub const RESERVED_PROPERTIES: [&str; 9] = [
177        Self::PROPERTY_FORMAT_VERSION,
178        Self::PROPERTY_UUID,
179        Self::PROPERTY_SNAPSHOT_COUNT,
180        Self::PROPERTY_CURRENT_SNAPSHOT_ID,
181        Self::PROPERTY_CURRENT_SNAPSHOT_SUMMARY,
182        Self::PROPERTY_CURRENT_SNAPSHOT_TIMESTAMP,
183        Self::PROPERTY_CURRENT_SCHEMA,
184        Self::PROPERTY_DEFAULT_PARTITION_SPEC,
185        Self::PROPERTY_DEFAULT_SORT_ORDER,
186    ];
187
188    /// Property key for number of commit retries.
189    pub const PROPERTY_COMMIT_NUM_RETRIES: &str = "commit.retry.num-retries";
190    /// Default value for number of commit retries.
191    pub const PROPERTY_COMMIT_NUM_RETRIES_DEFAULT: usize = 4;
192
193    /// Property key for minimum wait time (ms) between retries.
194    pub const PROPERTY_COMMIT_MIN_RETRY_WAIT_MS: &str = "commit.retry.min-wait-ms";
195    /// Default value for minimum wait time (ms) between retries.
196    pub const PROPERTY_COMMIT_MIN_RETRY_WAIT_MS_DEFAULT: u64 = 100;
197
198    /// Property key for maximum wait time (ms) between retries.
199    pub const PROPERTY_COMMIT_MAX_RETRY_WAIT_MS: &str = "commit.retry.max-wait-ms";
200    /// Default value for maximum wait time (ms) between retries.
201    pub const PROPERTY_COMMIT_MAX_RETRY_WAIT_MS_DEFAULT: u64 = 60 * 1000; // 1 minute
202
203    /// Property key for total maximum retry time (ms).
204    pub const PROPERTY_COMMIT_TOTAL_RETRY_TIME_MS: &str = "commit.retry.total-timeout-ms";
205    /// Default value for total maximum retry time (ms).
206    pub const PROPERTY_COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT: u64 = 30 * 60 * 1000; // 30 minutes
207
208    /// Default file format for data files
209    pub const PROPERTY_DEFAULT_FILE_FORMAT: &str = "write.format.default";
210    /// Default file format for delete files
211    pub const PROPERTY_DELETE_DEFAULT_FILE_FORMAT: &str = "write.delete.format.default";
212    /// Default value for data file format
213    pub const PROPERTY_DEFAULT_FILE_FORMAT_DEFAULT: &str = "parquet";
214
215    /// Target file size for newly written files.
216    pub const PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES: &str = "write.target-file-size-bytes";
217    /// Default target file size
218    pub const PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT: usize = 512 * 1024 * 1024; // 512 MB
219
220    /// Compression codec for metadata files (JSON)
221    pub const PROPERTY_METADATA_COMPRESSION_CODEC: &str = "write.metadata.compression-codec";
222    /// Default metadata compression codec - uncompressed
223    pub const PROPERTY_METADATA_COMPRESSION_CODEC_DEFAULT: &str = "none";
224    /// Whether to use `FanoutWriter` for partitioned tables (handles unsorted data).
225    /// If false, uses `ClusteredWriter` (requires sorted data, more memory efficient).
226    pub const PROPERTY_DATAFUSION_WRITE_FANOUT_ENABLED: &str = "write.datafusion.fanout.enabled";
227    /// Default value for fanout writer enabled
228    pub const PROPERTY_DATAFUSION_WRITE_FANOUT_ENABLED_DEFAULT: bool = true;
229
230    /// Property key for enabling garbage collection on drop.
231    /// When set to `false`, data files will not be deleted when a table is dropped.
232    /// Defaults to `true`.
233    pub const PROPERTY_GC_ENABLED: &str = "gc.enabled";
234    /// Default value for gc.enabled
235    pub const PROPERTY_GC_ENABLED_DEFAULT: bool = true;
236
237    /// Enable content-defined chunking with parquet defaults (or per-property overrides).
238    pub const PROPERTY_PARQUET_CDC_ENABLED: &str = "write.parquet.content-defined-chunking.enabled";
239    /// Default value for content-defined chunking enabled.
240    pub const PROPERTY_PARQUET_CDC_ENABLED_DEFAULT: bool = false;
241    /// Minimum chunk size in bytes for content-defined chunking.
242    pub const PROPERTY_PARQUET_CDC_MIN_CHUNK_SIZE: &str =
243        "write.parquet.content-defined-chunking.min-chunk-size";
244    /// Default matches `parquet::file::properties::DEFAULT_CDC_MIN_CHUNK_SIZE`.
245    pub const PROPERTY_PARQUET_CDC_MIN_CHUNK_SIZE_DEFAULT: usize = 256 * 1024;
246    /// Maximum chunk size in bytes for content-defined chunking.
247    pub const PROPERTY_PARQUET_CDC_MAX_CHUNK_SIZE: &str =
248        "write.parquet.content-defined-chunking.max-chunk-size";
249    /// Default matches `parquet::file::properties::DEFAULT_CDC_MAX_CHUNK_SIZE`.
250    pub const PROPERTY_PARQUET_CDC_MAX_CHUNK_SIZE_DEFAULT: usize = 1024 * 1024;
251    /// Normalization level (gearhash bit adjustment) for content-defined chunking.
252    pub const PROPERTY_PARQUET_CDC_NORM_LEVEL: &str =
253        "write.parquet.content-defined-chunking.norm-level";
254    /// Default matches `parquet::file::properties::DEFAULT_CDC_NORM_LEVEL`.
255    pub const PROPERTY_PARQUET_CDC_NORM_LEVEL_DEFAULT: i32 = 0;
256}
257
258impl TryFrom<&HashMap<String, String>> for TableProperties {
259    // parse by entry key or use default value
260    type Error = Error;
261
262    fn try_from(props: &HashMap<String, String>) -> Result<Self> {
263        Ok(TableProperties {
264            commit_num_retries: parse_property(
265                props,
266                TableProperties::PROPERTY_COMMIT_NUM_RETRIES,
267                TableProperties::PROPERTY_COMMIT_NUM_RETRIES_DEFAULT,
268            )?,
269            commit_min_retry_wait_ms: parse_property(
270                props,
271                TableProperties::PROPERTY_COMMIT_MIN_RETRY_WAIT_MS,
272                TableProperties::PROPERTY_COMMIT_MIN_RETRY_WAIT_MS_DEFAULT,
273            )?,
274            commit_max_retry_wait_ms: parse_property(
275                props,
276                TableProperties::PROPERTY_COMMIT_MAX_RETRY_WAIT_MS,
277                TableProperties::PROPERTY_COMMIT_MAX_RETRY_WAIT_MS_DEFAULT,
278            )?,
279            commit_total_retry_timeout_ms: parse_property(
280                props,
281                TableProperties::PROPERTY_COMMIT_TOTAL_RETRY_TIME_MS,
282                TableProperties::PROPERTY_COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT,
283            )?,
284            write_format_default: parse_property(
285                props,
286                TableProperties::PROPERTY_DEFAULT_FILE_FORMAT,
287                TableProperties::PROPERTY_DEFAULT_FILE_FORMAT_DEFAULT.to_string(),
288            )?,
289            write_target_file_size_bytes: parse_property(
290                props,
291                TableProperties::PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES,
292                TableProperties::PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT,
293            )?,
294            metadata_compression_codec: parse_metadata_file_compression(props)?,
295            write_datafusion_fanout_enabled: parse_property(
296                props,
297                TableProperties::PROPERTY_DATAFUSION_WRITE_FANOUT_ENABLED,
298                TableProperties::PROPERTY_DATAFUSION_WRITE_FANOUT_ENABLED_DEFAULT,
299            )?,
300            gc_enabled: parse_property(
301                props,
302                TableProperties::PROPERTY_GC_ENABLED,
303                TableProperties::PROPERTY_GC_ENABLED_DEFAULT,
304            )?,
305            cdc_enabled: parse_property(
306                props,
307                TableProperties::PROPERTY_PARQUET_CDC_ENABLED,
308                TableProperties::PROPERTY_PARQUET_CDC_ENABLED_DEFAULT,
309            )?,
310            cdc_min_chunk_size: parse_property(
311                props,
312                TableProperties::PROPERTY_PARQUET_CDC_MIN_CHUNK_SIZE,
313                TableProperties::PROPERTY_PARQUET_CDC_MIN_CHUNK_SIZE_DEFAULT,
314            )?,
315            cdc_max_chunk_size: parse_property(
316                props,
317                TableProperties::PROPERTY_PARQUET_CDC_MAX_CHUNK_SIZE,
318                TableProperties::PROPERTY_PARQUET_CDC_MAX_CHUNK_SIZE_DEFAULT,
319            )?,
320            cdc_norm_level: parse_property(
321                props,
322                TableProperties::PROPERTY_PARQUET_CDC_NORM_LEVEL,
323                TableProperties::PROPERTY_PARQUET_CDC_NORM_LEVEL_DEFAULT,
324            )?,
325        })
326    }
327}
328
329#[cfg(test)]
330mod tests {
331    use super::*;
332    use crate::compression::CompressionCodec;
333
334    #[test]
335    fn test_table_properties_default() {
336        let props = HashMap::new();
337        let table_properties = TableProperties::try_from(&props).unwrap();
338        assert_eq!(
339            table_properties.commit_num_retries,
340            TableProperties::PROPERTY_COMMIT_NUM_RETRIES_DEFAULT
341        );
342        assert_eq!(
343            table_properties.commit_min_retry_wait_ms,
344            TableProperties::PROPERTY_COMMIT_MIN_RETRY_WAIT_MS_DEFAULT
345        );
346        assert_eq!(
347            table_properties.commit_max_retry_wait_ms,
348            TableProperties::PROPERTY_COMMIT_MAX_RETRY_WAIT_MS_DEFAULT
349        );
350        assert_eq!(
351            table_properties.write_format_default,
352            TableProperties::PROPERTY_DEFAULT_FILE_FORMAT_DEFAULT.to_string()
353        );
354        assert_eq!(
355            table_properties.write_target_file_size_bytes,
356            TableProperties::PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT
357        );
358        // Test compression defaults (none means CompressionCodec::None)
359        assert_eq!(
360            table_properties.metadata_compression_codec,
361            CompressionCodec::None
362        );
363        assert_eq!(
364            table_properties.gc_enabled,
365            TableProperties::PROPERTY_GC_ENABLED_DEFAULT
366        );
367    }
368
369    #[test]
370    fn test_table_properties_compression() {
371        let props = HashMap::from([(
372            TableProperties::PROPERTY_METADATA_COMPRESSION_CODEC.to_string(),
373            "gzip".to_string(),
374        )]);
375        let table_properties = TableProperties::try_from(&props).unwrap();
376        assert_eq!(
377            table_properties.metadata_compression_codec,
378            CompressionCodec::gzip_default()
379        );
380    }
381
382    #[test]
383    fn test_table_properties_compression_none() {
384        let props = HashMap::from([(
385            TableProperties::PROPERTY_METADATA_COMPRESSION_CODEC.to_string(),
386            "none".to_string(),
387        )]);
388        let table_properties = TableProperties::try_from(&props).unwrap();
389        assert_eq!(
390            table_properties.metadata_compression_codec,
391            CompressionCodec::None
392        );
393    }
394
395    #[test]
396    fn test_table_properties_compression_case_insensitive() {
397        // Test uppercase
398        let props_upper = HashMap::from([(
399            TableProperties::PROPERTY_METADATA_COMPRESSION_CODEC.to_string(),
400            "GZIP".to_string(),
401        )]);
402        let table_properties = TableProperties::try_from(&props_upper).unwrap();
403        assert_eq!(
404            table_properties.metadata_compression_codec,
405            CompressionCodec::gzip_default()
406        );
407
408        // Test mixed case
409        let props_mixed = HashMap::from([(
410            TableProperties::PROPERTY_METADATA_COMPRESSION_CODEC.to_string(),
411            "GzIp".to_string(),
412        )]);
413        let table_properties = TableProperties::try_from(&props_mixed).unwrap();
414        assert_eq!(
415            table_properties.metadata_compression_codec,
416            CompressionCodec::gzip_default()
417        );
418
419        // Test "NONE" should also be case-insensitive
420        let props_none_upper = HashMap::from([(
421            TableProperties::PROPERTY_METADATA_COMPRESSION_CODEC.to_string(),
422            "NONE".to_string(),
423        )]);
424        let table_properties = TableProperties::try_from(&props_none_upper).unwrap();
425        assert_eq!(
426            table_properties.metadata_compression_codec,
427            CompressionCodec::None
428        );
429    }
430
431    #[test]
432    fn test_table_properties_valid() {
433        let props = HashMap::from([
434            (
435                TableProperties::PROPERTY_COMMIT_NUM_RETRIES.to_string(),
436                "10".to_string(),
437            ),
438            (
439                TableProperties::PROPERTY_COMMIT_MAX_RETRY_WAIT_MS.to_string(),
440                "20".to_string(),
441            ),
442            (
443                TableProperties::PROPERTY_DEFAULT_FILE_FORMAT.to_string(),
444                "avro".to_string(),
445            ),
446            (
447                TableProperties::PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES.to_string(),
448                "512".to_string(),
449            ),
450            (
451                TableProperties::PROPERTY_GC_ENABLED.to_string(),
452                "false".to_string(),
453            ),
454        ]);
455        let table_properties = TableProperties::try_from(&props).unwrap();
456        assert_eq!(table_properties.commit_num_retries, 10);
457        assert_eq!(table_properties.commit_max_retry_wait_ms, 20);
458        assert_eq!(table_properties.write_format_default, "avro".to_string());
459        assert_eq!(table_properties.write_target_file_size_bytes, 512);
460        assert!(!table_properties.gc_enabled);
461    }
462
463    #[test]
464    fn test_table_properties_invalid() {
465        let invalid_retries = HashMap::from([(
466            TableProperties::PROPERTY_COMMIT_NUM_RETRIES.to_string(),
467            "abc".to_string(),
468        )]);
469
470        let table_properties = TableProperties::try_from(&invalid_retries).unwrap_err();
471        assert!(
472            table_properties.to_string().contains(
473                "Invalid value for commit.retry.num-retries: invalid digit found in string"
474            )
475        );
476
477        let invalid_min_wait = HashMap::from([(
478            TableProperties::PROPERTY_COMMIT_MIN_RETRY_WAIT_MS.to_string(),
479            "abc".to_string(),
480        )]);
481        let table_properties = TableProperties::try_from(&invalid_min_wait).unwrap_err();
482        assert!(
483            table_properties.to_string().contains(
484                "Invalid value for commit.retry.min-wait-ms: invalid digit found in string"
485            )
486        );
487
488        let invalid_max_wait = HashMap::from([(
489            TableProperties::PROPERTY_COMMIT_MAX_RETRY_WAIT_MS.to_string(),
490            "abc".to_string(),
491        )]);
492        let table_properties = TableProperties::try_from(&invalid_max_wait).unwrap_err();
493        assert!(
494            table_properties.to_string().contains(
495                "Invalid value for commit.retry.max-wait-ms: invalid digit found in string"
496            )
497        );
498
499        let invalid_target_size = HashMap::from([(
500            TableProperties::PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES.to_string(),
501            "abc".to_string(),
502        )]);
503        let table_properties = TableProperties::try_from(&invalid_target_size).unwrap_err();
504        assert!(table_properties.to_string().contains(
505            "Invalid value for write.target-file-size-bytes: invalid digit found in string"
506        ));
507
508        let invalid_gc_enabled = HashMap::from([(
509            TableProperties::PROPERTY_GC_ENABLED.to_string(),
510            "notabool".to_string(),
511        )]);
512        let table_properties = TableProperties::try_from(&invalid_gc_enabled).unwrap_err();
513        assert!(
514            table_properties
515                .to_string()
516                .contains("Invalid value for gc.enabled")
517        );
518    }
519
520    #[test]
521    fn test_table_properties_compression_invalid_rejected() {
522        let invalid_codecs = ["lz4", "zstd", "snappy"];
523
524        for codec in invalid_codecs {
525            let props = HashMap::from([(
526                TableProperties::PROPERTY_METADATA_COMPRESSION_CODEC.to_string(),
527                codec.to_string(),
528            )]);
529            let err = TableProperties::try_from(&props).unwrap_err();
530            let err_msg = err.to_string();
531            assert!(
532                err_msg.contains(&format!("Invalid metadata compression codec: {codec}")),
533                "Expected error message to contain codec '{codec}', got: {err_msg}"
534            );
535            assert!(
536                err_msg.contains("Only 'none' and 'gzip' are supported"),
537                "Expected error message to contain supported codecs, got: {err_msg}"
538            );
539        }
540    }
541
542    #[test]
543    fn test_parse_metadata_file_compression_valid() {
544        // Test with "none"
545        let props = HashMap::from([(
546            TableProperties::PROPERTY_METADATA_COMPRESSION_CODEC.to_string(),
547            "none".to_string(),
548        )]);
549        assert_eq!(
550            parse_metadata_file_compression(&props).unwrap(),
551            CompressionCodec::None
552        );
553
554        // Test with empty string
555        let props = HashMap::from([(
556            TableProperties::PROPERTY_METADATA_COMPRESSION_CODEC.to_string(),
557            "".to_string(),
558        )]);
559        assert_eq!(
560            parse_metadata_file_compression(&props).unwrap(),
561            CompressionCodec::None
562        );
563
564        // Test with "gzip"
565        let props = HashMap::from([(
566            TableProperties::PROPERTY_METADATA_COMPRESSION_CODEC.to_string(),
567            "gzip".to_string(),
568        )]);
569        assert_eq!(
570            parse_metadata_file_compression(&props).unwrap(),
571            CompressionCodec::gzip_default()
572        );
573
574        // Test case insensitivity - "NONE"
575        let props = HashMap::from([(
576            TableProperties::PROPERTY_METADATA_COMPRESSION_CODEC.to_string(),
577            "NONE".to_string(),
578        )]);
579        assert_eq!(
580            parse_metadata_file_compression(&props).unwrap(),
581            CompressionCodec::None
582        );
583
584        // Test case insensitivity - "GZIP"
585        let props = HashMap::from([(
586            TableProperties::PROPERTY_METADATA_COMPRESSION_CODEC.to_string(),
587            "GZIP".to_string(),
588        )]);
589        assert_eq!(
590            parse_metadata_file_compression(&props).unwrap(),
591            CompressionCodec::gzip_default()
592        );
593
594        // Test case insensitivity - "GzIp"
595        let props = HashMap::from([(
596            TableProperties::PROPERTY_METADATA_COMPRESSION_CODEC.to_string(),
597            "GzIp".to_string(),
598        )]);
599        assert_eq!(
600            parse_metadata_file_compression(&props).unwrap(),
601            CompressionCodec::gzip_default()
602        );
603
604        // Test default when property is missing
605        let props = HashMap::new();
606        assert_eq!(
607            parse_metadata_file_compression(&props).unwrap(),
608            CompressionCodec::None
609        );
610    }
611
612    #[test]
613    fn test_parse_metadata_file_compression_invalid() {
614        let invalid_codecs = ["lz4", "zstd", "snappy"];
615
616        for codec in invalid_codecs {
617            let props = HashMap::from([(
618                TableProperties::PROPERTY_METADATA_COMPRESSION_CODEC.to_string(),
619                codec.to_string(),
620            )]);
621            let err = parse_metadata_file_compression(&props).unwrap_err();
622            let err_msg = err.to_string();
623            assert!(
624                err_msg.contains("Invalid metadata compression codec"),
625                "Expected error message to contain 'Invalid metadata compression codec', got: {err_msg}"
626            );
627            assert!(
628                err_msg.contains("Only 'none' and 'gzip' are supported"),
629                "Expected error message to contain supported codecs, got: {err_msg}"
630            );
631        }
632    }
633
634    #[test]
635    fn test_cdc_disabled_by_default() {
636        let props = HashMap::new();
637        let tp = TableProperties::try_from(&props).unwrap();
638        assert!(!tp.cdc_enabled);
639    }
640
641    #[test]
642    fn test_cdc_enabled_via_flag() {
643        let props = HashMap::from([(
644            TableProperties::PROPERTY_PARQUET_CDC_ENABLED.to_string(),
645            "true".to_string(),
646        )]);
647        let tp = TableProperties::try_from(&props).unwrap();
648        assert!(tp.cdc_enabled);
649        assert_eq!(tp.cdc_min_chunk_size, 256 * 1024);
650        assert_eq!(tp.cdc_max_chunk_size, 1024 * 1024);
651        assert_eq!(tp.cdc_norm_level, 0);
652    }
653
654    #[test]
655    fn test_cdc_size_props_alone_do_not_enable() {
656        let props = HashMap::from([(
657            TableProperties::PROPERTY_PARQUET_CDC_MIN_CHUNK_SIZE.to_string(),
658            "262144".to_string(),
659        )]);
660        let tp = TableProperties::try_from(&props).unwrap();
661        assert!(!tp.cdc_enabled);
662    }
663
664    #[test]
665    fn test_cdc_custom_values() {
666        let props = HashMap::from([
667            (
668                TableProperties::PROPERTY_PARQUET_CDC_ENABLED.to_string(),
669                "true".to_string(),
670            ),
671            (
672                TableProperties::PROPERTY_PARQUET_CDC_MIN_CHUNK_SIZE.to_string(),
673                "200000".to_string(),
674            ),
675            (
676                TableProperties::PROPERTY_PARQUET_CDC_MAX_CHUNK_SIZE.to_string(),
677                "900000".to_string(),
678            ),
679            (
680                TableProperties::PROPERTY_PARQUET_CDC_NORM_LEVEL.to_string(),
681                "1".to_string(),
682            ),
683        ]);
684        let tp = TableProperties::try_from(&props).unwrap();
685        assert!(tp.cdc_enabled);
686        assert_eq!(tp.cdc_min_chunk_size, 200000);
687        assert_eq!(tp.cdc_max_chunk_size, 900000);
688        assert_eq!(tp.cdc_norm_level, 1);
689    }
690
691    #[test]
692    fn test_cdc_partial_override() {
693        let props = HashMap::from([
694            (
695                TableProperties::PROPERTY_PARQUET_CDC_ENABLED.to_string(),
696                "true".to_string(),
697            ),
698            (
699                TableProperties::PROPERTY_PARQUET_CDC_NORM_LEVEL.to_string(),
700                "2".to_string(),
701            ),
702        ]);
703        let tp = TableProperties::try_from(&props).unwrap();
704        assert!(tp.cdc_enabled);
705        assert_eq!(tp.cdc_min_chunk_size, 256 * 1024);
706        assert_eq!(tp.cdc_max_chunk_size, 1024 * 1024);
707        assert_eq!(tp.cdc_norm_level, 2);
708    }
709
710    #[test]
711    fn test_cdc_negative_norm_level() {
712        let props = HashMap::from([
713            (
714                TableProperties::PROPERTY_PARQUET_CDC_ENABLED.to_string(),
715                "true".to_string(),
716            ),
717            (
718                TableProperties::PROPERTY_PARQUET_CDC_NORM_LEVEL.to_string(),
719                "-2".to_string(),
720            ),
721        ]);
722        let tp = TableProperties::try_from(&props).unwrap();
723        assert_eq!(tp.cdc_norm_level, -2);
724    }
725
726    #[test]
727    fn test_cdc_invalid_min_chunk_size() {
728        let props = HashMap::from([
729            (
730                TableProperties::PROPERTY_PARQUET_CDC_ENABLED.to_string(),
731                "true".to_string(),
732            ),
733            (
734                TableProperties::PROPERTY_PARQUET_CDC_MIN_CHUNK_SIZE.to_string(),
735                "not_a_number".to_string(),
736            ),
737        ]);
738        let err = TableProperties::try_from(&props).unwrap_err();
739        assert!(
740            err.to_string().contains(
741                "Invalid value for write.parquet.content-defined-chunking.min-chunk-size"
742            )
743        );
744    }
745
746    #[test]
747    fn test_cdc_invalid_norm_level() {
748        let props = HashMap::from([
749            (
750                TableProperties::PROPERTY_PARQUET_CDC_ENABLED.to_string(),
751                "true".to_string(),
752            ),
753            (
754                TableProperties::PROPERTY_PARQUET_CDC_NORM_LEVEL.to_string(),
755                "not_a_number".to_string(),
756            ),
757        ]);
758        let err = TableProperties::try_from(&props).unwrap_err();
759        assert!(
760            err.to_string()
761                .contains("Invalid value for write.parquet.content-defined-chunking.norm-level")
762        );
763    }
764
765    #[test]
766    fn test_cdc_no_properties() {
767        let props = HashMap::from([("some.other.property".to_string(), "value".to_string())]);
768        let tp = TableProperties::try_from(&props).unwrap();
769        assert!(!tp.cdc_enabled);
770    }
771}