iceberg/spec/
table_properties.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::collections::HashMap;
19use std::fmt::Display;
20use std::str::FromStr;
21
22use crate::compression::CompressionCodec;
23use crate::error::{Error, ErrorKind, Result};
24
25// Helper function to parse a property from a HashMap
26// If the property is not found, use the default value
27fn parse_property<T: FromStr>(
28    properties: &HashMap<String, String>,
29    key: &str,
30    default: T,
31) -> Result<T>
32where
33    <T as FromStr>::Err: Display,
34{
35    properties.get(key).map_or(Ok(default), |value| {
36        value.parse::<T>().map_err(|e| {
37            Error::new(
38                ErrorKind::DataInvalid,
39                format!("Invalid value for {key}: {e}"),
40            )
41        })
42    })
43}
44
45/// Parse compression codec for metadata files from table properties.
46/// Retrieves the compression codec property, applies defaults, and parses the value.
47/// Only "none" (or empty string) and "gzip" are supported for metadata compression.
48///
49/// # Arguments
50///
51/// * `properties` - HashMap containing table properties
52///
53/// # Errors
54///
55/// Returns an error if the codec is not "none", "", or "gzip" (case-insensitive).
56/// Lz4 and Zstd are not supported for metadata file compression.
57pub(crate) fn parse_metadata_file_compression(
58    properties: &HashMap<String, String>,
59) -> Result<CompressionCodec> {
60    let value = properties
61        .get(TableProperties::PROPERTY_METADATA_COMPRESSION_CODEC)
62        .map(|s| s.as_str())
63        .unwrap_or(TableProperties::PROPERTY_METADATA_COMPRESSION_CODEC_DEFAULT);
64
65    // Handle empty string as None
66    if value.is_empty() {
67        return Ok(CompressionCodec::None);
68    }
69
70    // Lowercase the value for case-insensitive parsing
71    let lowercase_value = value.to_lowercase();
72
73    // Use serde to parse the codec (which has rename_all = "lowercase")
74    let codec: CompressionCodec = serde_json::from_value(serde_json::Value::String(
75        lowercase_value,
76    ))
77    .map_err(|_| {
78        Error::new(
79            ErrorKind::DataInvalid,
80            format!(
81                "Invalid metadata compression codec: {value}. Only '{}' and '{}' are supported.",
82                CompressionCodec::None.name(),
83                CompressionCodec::gzip_default().name()
84            ),
85        )
86    })?;
87
88    // Validate that only None and Gzip are used for metadata
89    match codec {
90        CompressionCodec::None | CompressionCodec::Gzip(_) => Ok(codec),
91        _ => Err(Error::new(
92            ErrorKind::DataInvalid,
93            format!(
94                "Invalid metadata compression codec: {value}. Only '{}' and '{}' are supported for metadata files.",
95                CompressionCodec::None.name(),
96                CompressionCodec::gzip_default().name()
97            ),
98        )),
99    }
100}
101
102/// TableProperties that contains the properties of a table.
103#[derive(Debug)]
104pub struct TableProperties {
105    /// The number of times to retry a commit.
106    pub commit_num_retries: usize,
107    /// The minimum wait time between retries.
108    pub commit_min_retry_wait_ms: u64,
109    /// The maximum wait time between retries.
110    pub commit_max_retry_wait_ms: u64,
111    /// The total timeout for commit retries.
112    pub commit_total_retry_timeout_ms: u64,
113    /// The default format for files.
114    pub write_format_default: String,
115    /// The target file size for files.
116    pub write_target_file_size_bytes: usize,
117    /// Compression codec for metadata files (JSON)
118    pub metadata_compression_codec: CompressionCodec,
119    /// Whether to use `FanoutWriter` for partitioned tables.
120    pub write_datafusion_fanout_enabled: bool,
121    /// Whether garbage collection is enabled on drop.
122    /// When `false`, data files will not be deleted when a table is dropped.
123    pub gc_enabled: bool,
124}
125
126impl TableProperties {
127    /// Reserved table property for table format version.
128    ///
129    /// Iceberg will default a new table's format version to the latest stable and recommended
130    /// version. This reserved property keyword allows users to override the Iceberg format version of
131    /// the table metadata.
132    ///
133    /// If this table property exists when creating a table, the table will use the specified format
134    /// version. If a table updates this property, it will try to upgrade to the specified format
135    /// version.
136    pub const PROPERTY_FORMAT_VERSION: &str = "format-version";
137    /// Reserved table property for table UUID.
138    pub const PROPERTY_UUID: &str = "uuid";
139    /// Reserved table property for the total number of snapshots.
140    pub const PROPERTY_SNAPSHOT_COUNT: &str = "snapshot-count";
141    /// Reserved table property for current snapshot summary.
142    pub const PROPERTY_CURRENT_SNAPSHOT_SUMMARY: &str = "current-snapshot-summary";
143    /// Reserved table property for current snapshot id.
144    pub const PROPERTY_CURRENT_SNAPSHOT_ID: &str = "current-snapshot-id";
145    /// Reserved table property for current snapshot timestamp.
146    pub const PROPERTY_CURRENT_SNAPSHOT_TIMESTAMP: &str = "current-snapshot-timestamp-ms";
147    /// Reserved table property for the JSON representation of current schema.
148    pub const PROPERTY_CURRENT_SCHEMA: &str = "current-schema";
149    /// Reserved table property for the JSON representation of current(default) partition spec.
150    pub const PROPERTY_DEFAULT_PARTITION_SPEC: &str = "default-partition-spec";
151    /// Reserved table property for the JSON representation of current(default) sort order.
152    pub const PROPERTY_DEFAULT_SORT_ORDER: &str = "default-sort-order";
153
154    /// Property key for max number of previous versions to keep.
155    pub const PROPERTY_METADATA_PREVIOUS_VERSIONS_MAX: &str =
156        "write.metadata.previous-versions-max";
157    /// Default value for max number of previous versions to keep.
158    pub const PROPERTY_METADATA_PREVIOUS_VERSIONS_MAX_DEFAULT: usize = 100;
159
160    /// Property key for max number of partitions to keep summary stats for.
161    pub const PROPERTY_WRITE_PARTITION_SUMMARY_LIMIT: &str = "write.summary.partition-limit";
162    /// Default value for the max number of partitions to keep summary stats for.
163    pub const PROPERTY_WRITE_PARTITION_SUMMARY_LIMIT_DEFAULT: u64 = 0;
164
165    /// Reserved Iceberg table properties list.
166    ///
167    /// Reserved table properties are only used to control behaviors when creating or updating a
168    /// table. The value of these properties are not persisted as a part of the table metadata.
169    pub const RESERVED_PROPERTIES: [&str; 9] = [
170        Self::PROPERTY_FORMAT_VERSION,
171        Self::PROPERTY_UUID,
172        Self::PROPERTY_SNAPSHOT_COUNT,
173        Self::PROPERTY_CURRENT_SNAPSHOT_ID,
174        Self::PROPERTY_CURRENT_SNAPSHOT_SUMMARY,
175        Self::PROPERTY_CURRENT_SNAPSHOT_TIMESTAMP,
176        Self::PROPERTY_CURRENT_SCHEMA,
177        Self::PROPERTY_DEFAULT_PARTITION_SPEC,
178        Self::PROPERTY_DEFAULT_SORT_ORDER,
179    ];
180
181    /// Property key for number of commit retries.
182    pub const PROPERTY_COMMIT_NUM_RETRIES: &str = "commit.retry.num-retries";
183    /// Default value for number of commit retries.
184    pub const PROPERTY_COMMIT_NUM_RETRIES_DEFAULT: usize = 4;
185
186    /// Property key for minimum wait time (ms) between retries.
187    pub const PROPERTY_COMMIT_MIN_RETRY_WAIT_MS: &str = "commit.retry.min-wait-ms";
188    /// Default value for minimum wait time (ms) between retries.
189    pub const PROPERTY_COMMIT_MIN_RETRY_WAIT_MS_DEFAULT: u64 = 100;
190
191    /// Property key for maximum wait time (ms) between retries.
192    pub const PROPERTY_COMMIT_MAX_RETRY_WAIT_MS: &str = "commit.retry.max-wait-ms";
193    /// Default value for maximum wait time (ms) between retries.
194    pub const PROPERTY_COMMIT_MAX_RETRY_WAIT_MS_DEFAULT: u64 = 60 * 1000; // 1 minute
195
196    /// Property key for total maximum retry time (ms).
197    pub const PROPERTY_COMMIT_TOTAL_RETRY_TIME_MS: &str = "commit.retry.total-timeout-ms";
198    /// Default value for total maximum retry time (ms).
199    pub const PROPERTY_COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT: u64 = 30 * 60 * 1000; // 30 minutes
200
201    /// Default file format for data files
202    pub const PROPERTY_DEFAULT_FILE_FORMAT: &str = "write.format.default";
203    /// Default file format for delete files
204    pub const PROPERTY_DELETE_DEFAULT_FILE_FORMAT: &str = "write.delete.format.default";
205    /// Default value for data file format
206    pub const PROPERTY_DEFAULT_FILE_FORMAT_DEFAULT: &str = "parquet";
207
208    /// Target file size for newly written files.
209    pub const PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES: &str = "write.target-file-size-bytes";
210    /// Default target file size
211    pub const PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT: usize = 512 * 1024 * 1024; // 512 MB
212
213    /// Compression codec for metadata files (JSON)
214    pub const PROPERTY_METADATA_COMPRESSION_CODEC: &str = "write.metadata.compression-codec";
215    /// Default metadata compression codec - uncompressed
216    pub const PROPERTY_METADATA_COMPRESSION_CODEC_DEFAULT: &str = "none";
217    /// Whether to use `FanoutWriter` for partitioned tables (handles unsorted data).
218    /// If false, uses `ClusteredWriter` (requires sorted data, more memory efficient).
219    pub const PROPERTY_DATAFUSION_WRITE_FANOUT_ENABLED: &str = "write.datafusion.fanout.enabled";
220    /// Default value for fanout writer enabled
221    pub const PROPERTY_DATAFUSION_WRITE_FANOUT_ENABLED_DEFAULT: bool = true;
222
223    /// Property key for enabling garbage collection on drop.
224    /// When set to `false`, data files will not be deleted when a table is dropped.
225    /// Defaults to `true`.
226    pub const PROPERTY_GC_ENABLED: &str = "gc.enabled";
227    /// Default value for gc.enabled
228    pub const PROPERTY_GC_ENABLED_DEFAULT: bool = true;
229}
230
231impl TryFrom<&HashMap<String, String>> for TableProperties {
232    // parse by entry key or use default value
233    type Error = Error;
234
235    fn try_from(props: &HashMap<String, String>) -> Result<Self> {
236        Ok(TableProperties {
237            commit_num_retries: parse_property(
238                props,
239                TableProperties::PROPERTY_COMMIT_NUM_RETRIES,
240                TableProperties::PROPERTY_COMMIT_NUM_RETRIES_DEFAULT,
241            )?,
242            commit_min_retry_wait_ms: parse_property(
243                props,
244                TableProperties::PROPERTY_COMMIT_MIN_RETRY_WAIT_MS,
245                TableProperties::PROPERTY_COMMIT_MIN_RETRY_WAIT_MS_DEFAULT,
246            )?,
247            commit_max_retry_wait_ms: parse_property(
248                props,
249                TableProperties::PROPERTY_COMMIT_MAX_RETRY_WAIT_MS,
250                TableProperties::PROPERTY_COMMIT_MAX_RETRY_WAIT_MS_DEFAULT,
251            )?,
252            commit_total_retry_timeout_ms: parse_property(
253                props,
254                TableProperties::PROPERTY_COMMIT_TOTAL_RETRY_TIME_MS,
255                TableProperties::PROPERTY_COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT,
256            )?,
257            write_format_default: parse_property(
258                props,
259                TableProperties::PROPERTY_DEFAULT_FILE_FORMAT,
260                TableProperties::PROPERTY_DEFAULT_FILE_FORMAT_DEFAULT.to_string(),
261            )?,
262            write_target_file_size_bytes: parse_property(
263                props,
264                TableProperties::PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES,
265                TableProperties::PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT,
266            )?,
267            metadata_compression_codec: parse_metadata_file_compression(props)?,
268            write_datafusion_fanout_enabled: parse_property(
269                props,
270                TableProperties::PROPERTY_DATAFUSION_WRITE_FANOUT_ENABLED,
271                TableProperties::PROPERTY_DATAFUSION_WRITE_FANOUT_ENABLED_DEFAULT,
272            )?,
273            gc_enabled: parse_property(
274                props,
275                TableProperties::PROPERTY_GC_ENABLED,
276                TableProperties::PROPERTY_GC_ENABLED_DEFAULT,
277            )?,
278        })
279    }
280}
281
282#[cfg(test)]
283mod tests {
284    use super::*;
285    use crate::compression::CompressionCodec;
286
287    #[test]
288    fn test_table_properties_default() {
289        let props = HashMap::new();
290        let table_properties = TableProperties::try_from(&props).unwrap();
291        assert_eq!(
292            table_properties.commit_num_retries,
293            TableProperties::PROPERTY_COMMIT_NUM_RETRIES_DEFAULT
294        );
295        assert_eq!(
296            table_properties.commit_min_retry_wait_ms,
297            TableProperties::PROPERTY_COMMIT_MIN_RETRY_WAIT_MS_DEFAULT
298        );
299        assert_eq!(
300            table_properties.commit_max_retry_wait_ms,
301            TableProperties::PROPERTY_COMMIT_MAX_RETRY_WAIT_MS_DEFAULT
302        );
303        assert_eq!(
304            table_properties.write_format_default,
305            TableProperties::PROPERTY_DEFAULT_FILE_FORMAT_DEFAULT.to_string()
306        );
307        assert_eq!(
308            table_properties.write_target_file_size_bytes,
309            TableProperties::PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT
310        );
311        // Test compression defaults (none means CompressionCodec::None)
312        assert_eq!(
313            table_properties.metadata_compression_codec,
314            CompressionCodec::None
315        );
316        assert_eq!(
317            table_properties.gc_enabled,
318            TableProperties::PROPERTY_GC_ENABLED_DEFAULT
319        );
320    }
321
322    #[test]
323    fn test_table_properties_compression() {
324        let props = HashMap::from([(
325            TableProperties::PROPERTY_METADATA_COMPRESSION_CODEC.to_string(),
326            "gzip".to_string(),
327        )]);
328        let table_properties = TableProperties::try_from(&props).unwrap();
329        assert_eq!(
330            table_properties.metadata_compression_codec,
331            CompressionCodec::gzip_default()
332        );
333    }
334
335    #[test]
336    fn test_table_properties_compression_none() {
337        let props = HashMap::from([(
338            TableProperties::PROPERTY_METADATA_COMPRESSION_CODEC.to_string(),
339            "none".to_string(),
340        )]);
341        let table_properties = TableProperties::try_from(&props).unwrap();
342        assert_eq!(
343            table_properties.metadata_compression_codec,
344            CompressionCodec::None
345        );
346    }
347
348    #[test]
349    fn test_table_properties_compression_case_insensitive() {
350        // Test uppercase
351        let props_upper = HashMap::from([(
352            TableProperties::PROPERTY_METADATA_COMPRESSION_CODEC.to_string(),
353            "GZIP".to_string(),
354        )]);
355        let table_properties = TableProperties::try_from(&props_upper).unwrap();
356        assert_eq!(
357            table_properties.metadata_compression_codec,
358            CompressionCodec::gzip_default()
359        );
360
361        // Test mixed case
362        let props_mixed = HashMap::from([(
363            TableProperties::PROPERTY_METADATA_COMPRESSION_CODEC.to_string(),
364            "GzIp".to_string(),
365        )]);
366        let table_properties = TableProperties::try_from(&props_mixed).unwrap();
367        assert_eq!(
368            table_properties.metadata_compression_codec,
369            CompressionCodec::gzip_default()
370        );
371
372        // Test "NONE" should also be case-insensitive
373        let props_none_upper = HashMap::from([(
374            TableProperties::PROPERTY_METADATA_COMPRESSION_CODEC.to_string(),
375            "NONE".to_string(),
376        )]);
377        let table_properties = TableProperties::try_from(&props_none_upper).unwrap();
378        assert_eq!(
379            table_properties.metadata_compression_codec,
380            CompressionCodec::None
381        );
382    }
383
384    #[test]
385    fn test_table_properties_valid() {
386        let props = HashMap::from([
387            (
388                TableProperties::PROPERTY_COMMIT_NUM_RETRIES.to_string(),
389                "10".to_string(),
390            ),
391            (
392                TableProperties::PROPERTY_COMMIT_MAX_RETRY_WAIT_MS.to_string(),
393                "20".to_string(),
394            ),
395            (
396                TableProperties::PROPERTY_DEFAULT_FILE_FORMAT.to_string(),
397                "avro".to_string(),
398            ),
399            (
400                TableProperties::PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES.to_string(),
401                "512".to_string(),
402            ),
403            (
404                TableProperties::PROPERTY_GC_ENABLED.to_string(),
405                "false".to_string(),
406            ),
407        ]);
408        let table_properties = TableProperties::try_from(&props).unwrap();
409        assert_eq!(table_properties.commit_num_retries, 10);
410        assert_eq!(table_properties.commit_max_retry_wait_ms, 20);
411        assert_eq!(table_properties.write_format_default, "avro".to_string());
412        assert_eq!(table_properties.write_target_file_size_bytes, 512);
413        assert!(!table_properties.gc_enabled);
414    }
415
416    #[test]
417    fn test_table_properties_invalid() {
418        let invalid_retries = HashMap::from([(
419            TableProperties::PROPERTY_COMMIT_NUM_RETRIES.to_string(),
420            "abc".to_string(),
421        )]);
422
423        let table_properties = TableProperties::try_from(&invalid_retries).unwrap_err();
424        assert!(
425            table_properties.to_string().contains(
426                "Invalid value for commit.retry.num-retries: invalid digit found in string"
427            )
428        );
429
430        let invalid_min_wait = HashMap::from([(
431            TableProperties::PROPERTY_COMMIT_MIN_RETRY_WAIT_MS.to_string(),
432            "abc".to_string(),
433        )]);
434        let table_properties = TableProperties::try_from(&invalid_min_wait).unwrap_err();
435        assert!(
436            table_properties.to_string().contains(
437                "Invalid value for commit.retry.min-wait-ms: invalid digit found in string"
438            )
439        );
440
441        let invalid_max_wait = HashMap::from([(
442            TableProperties::PROPERTY_COMMIT_MAX_RETRY_WAIT_MS.to_string(),
443            "abc".to_string(),
444        )]);
445        let table_properties = TableProperties::try_from(&invalid_max_wait).unwrap_err();
446        assert!(
447            table_properties.to_string().contains(
448                "Invalid value for commit.retry.max-wait-ms: invalid digit found in string"
449            )
450        );
451
452        let invalid_target_size = HashMap::from([(
453            TableProperties::PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES.to_string(),
454            "abc".to_string(),
455        )]);
456        let table_properties = TableProperties::try_from(&invalid_target_size).unwrap_err();
457        assert!(table_properties.to_string().contains(
458            "Invalid value for write.target-file-size-bytes: invalid digit found in string"
459        ));
460
461        let invalid_gc_enabled = HashMap::from([(
462            TableProperties::PROPERTY_GC_ENABLED.to_string(),
463            "notabool".to_string(),
464        )]);
465        let table_properties = TableProperties::try_from(&invalid_gc_enabled).unwrap_err();
466        assert!(
467            table_properties
468                .to_string()
469                .contains("Invalid value for gc.enabled")
470        );
471    }
472
473    #[test]
474    fn test_table_properties_compression_invalid_rejected() {
475        let invalid_codecs = ["lz4", "zstd", "snappy"];
476
477        for codec in invalid_codecs {
478            let props = HashMap::from([(
479                TableProperties::PROPERTY_METADATA_COMPRESSION_CODEC.to_string(),
480                codec.to_string(),
481            )]);
482            let err = TableProperties::try_from(&props).unwrap_err();
483            let err_msg = err.to_string();
484            assert!(
485                err_msg.contains(&format!("Invalid metadata compression codec: {codec}")),
486                "Expected error message to contain codec '{codec}', got: {err_msg}"
487            );
488            assert!(
489                err_msg.contains("Only 'none' and 'gzip' are supported"),
490                "Expected error message to contain supported codecs, got: {err_msg}"
491            );
492        }
493    }
494
495    #[test]
496    fn test_parse_metadata_file_compression_valid() {
497        // Test with "none"
498        let props = HashMap::from([(
499            TableProperties::PROPERTY_METADATA_COMPRESSION_CODEC.to_string(),
500            "none".to_string(),
501        )]);
502        assert_eq!(
503            parse_metadata_file_compression(&props).unwrap(),
504            CompressionCodec::None
505        );
506
507        // Test with empty string
508        let props = HashMap::from([(
509            TableProperties::PROPERTY_METADATA_COMPRESSION_CODEC.to_string(),
510            "".to_string(),
511        )]);
512        assert_eq!(
513            parse_metadata_file_compression(&props).unwrap(),
514            CompressionCodec::None
515        );
516
517        // Test with "gzip"
518        let props = HashMap::from([(
519            TableProperties::PROPERTY_METADATA_COMPRESSION_CODEC.to_string(),
520            "gzip".to_string(),
521        )]);
522        assert_eq!(
523            parse_metadata_file_compression(&props).unwrap(),
524            CompressionCodec::gzip_default()
525        );
526
527        // Test case insensitivity - "NONE"
528        let props = HashMap::from([(
529            TableProperties::PROPERTY_METADATA_COMPRESSION_CODEC.to_string(),
530            "NONE".to_string(),
531        )]);
532        assert_eq!(
533            parse_metadata_file_compression(&props).unwrap(),
534            CompressionCodec::None
535        );
536
537        // Test case insensitivity - "GZIP"
538        let props = HashMap::from([(
539            TableProperties::PROPERTY_METADATA_COMPRESSION_CODEC.to_string(),
540            "GZIP".to_string(),
541        )]);
542        assert_eq!(
543            parse_metadata_file_compression(&props).unwrap(),
544            CompressionCodec::gzip_default()
545        );
546
547        // Test case insensitivity - "GzIp"
548        let props = HashMap::from([(
549            TableProperties::PROPERTY_METADATA_COMPRESSION_CODEC.to_string(),
550            "GzIp".to_string(),
551        )]);
552        assert_eq!(
553            parse_metadata_file_compression(&props).unwrap(),
554            CompressionCodec::gzip_default()
555        );
556
557        // Test default when property is missing
558        let props = HashMap::new();
559        assert_eq!(
560            parse_metadata_file_compression(&props).unwrap(),
561            CompressionCodec::None
562        );
563    }
564
565    #[test]
566    fn test_parse_metadata_file_compression_invalid() {
567        let invalid_codecs = ["lz4", "zstd", "snappy"];
568
569        for codec in invalid_codecs {
570            let props = HashMap::from([(
571                TableProperties::PROPERTY_METADATA_COMPRESSION_CODEC.to_string(),
572                codec.to_string(),
573            )]);
574            let err = parse_metadata_file_compression(&props).unwrap_err();
575            let err_msg = err.to_string();
576            assert!(
577                err_msg.contains("Invalid metadata compression codec"),
578                "Expected error message to contain 'Invalid metadata compression codec', got: {err_msg}"
579            );
580            assert!(
581                err_msg.contains("Only 'none' and 'gzip' are supported"),
582                "Expected error message to contain supported codecs, got: {err_msg}"
583            );
584        }
585    }
586}