iceberg/spec/
table_properties.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::collections::HashMap;
19use std::fmt::Display;
20use std::str::FromStr;
21
22use crate::compression::CompressionCodec;
23use crate::error::{Error, ErrorKind, Result};
24
25// Helper function to parse a property from a HashMap
26// If the property is not found, use the default value
27fn parse_property<T: FromStr>(
28    properties: &HashMap<String, String>,
29    key: &str,
30    default: T,
31) -> Result<T>
32where
33    <T as FromStr>::Err: Display,
34{
35    properties.get(key).map_or(Ok(default), |value| {
36        value.parse::<T>().map_err(|e| {
37            Error::new(
38                ErrorKind::DataInvalid,
39                format!("Invalid value for {key}: {e}"),
40            )
41        })
42    })
43}
44
45/// Parse compression codec for metadata files from table properties.
46/// Retrieves the compression codec property, applies defaults, and parses the value.
47/// Only "none" (or empty string) and "gzip" are supported for metadata compression.
48///
49/// # Arguments
50///
51/// * `properties` - HashMap containing table properties
52///
53/// # Errors
54///
55/// Returns an error if the codec is not "none", "", or "gzip" (case-insensitive).
56/// Lz4 and Zstd are not supported for metadata file compression.
57pub(crate) fn parse_metadata_file_compression(
58    properties: &HashMap<String, String>,
59) -> Result<CompressionCodec> {
60    let value = properties
61        .get(TableProperties::PROPERTY_METADATA_COMPRESSION_CODEC)
62        .map(|s| s.as_str())
63        .unwrap_or(TableProperties::PROPERTY_METADATA_COMPRESSION_CODEC_DEFAULT);
64
65    // Handle empty string as None
66    if value.is_empty() {
67        return Ok(CompressionCodec::None);
68    }
69
70    // Lowercase the value for case-insensitive parsing
71    let lowercase_value = value.to_lowercase();
72
73    // Use serde to parse the codec (which has rename_all = "lowercase")
74    let codec: CompressionCodec = serde_json::from_value(serde_json::Value::String(
75        lowercase_value,
76    ))
77    .map_err(|_| {
78        Error::new(
79            ErrorKind::DataInvalid,
80            format!(
81                "Invalid metadata compression codec: {value}. Only 'none' and 'gzip' are supported."
82            ),
83        )
84    })?;
85
86    // Validate that only None and Gzip are used for metadata
87    match codec {
88        CompressionCodec::None | CompressionCodec::Gzip => Ok(codec),
89        CompressionCodec::Lz4 | CompressionCodec::Zstd => Err(Error::new(
90            ErrorKind::DataInvalid,
91            format!(
92                "Invalid metadata compression codec: {value}. Only 'none' and 'gzip' are supported for metadata files."
93            ),
94        )),
95    }
96}
97
98/// TableProperties that contains the properties of a table.
99#[derive(Debug)]
100pub struct TableProperties {
101    /// The number of times to retry a commit.
102    pub commit_num_retries: usize,
103    /// The minimum wait time between retries.
104    pub commit_min_retry_wait_ms: u64,
105    /// The maximum wait time between retries.
106    pub commit_max_retry_wait_ms: u64,
107    /// The total timeout for commit retries.
108    pub commit_total_retry_timeout_ms: u64,
109    /// The default format for files.
110    pub write_format_default: String,
111    /// The target file size for files.
112    pub write_target_file_size_bytes: usize,
113    /// Compression codec for metadata files (JSON)
114    pub metadata_compression_codec: CompressionCodec,
115    /// Whether to use `FanoutWriter` for partitioned tables.
116    pub write_datafusion_fanout_enabled: bool,
117}
118
119impl TableProperties {
120    /// Reserved table property for table format version.
121    ///
122    /// Iceberg will default a new table's format version to the latest stable and recommended
123    /// version. This reserved property keyword allows users to override the Iceberg format version of
124    /// the table metadata.
125    ///
126    /// If this table property exists when creating a table, the table will use the specified format
127    /// version. If a table updates this property, it will try to upgrade to the specified format
128    /// version.
129    pub const PROPERTY_FORMAT_VERSION: &str = "format-version";
130    /// Reserved table property for table UUID.
131    pub const PROPERTY_UUID: &str = "uuid";
132    /// Reserved table property for the total number of snapshots.
133    pub const PROPERTY_SNAPSHOT_COUNT: &str = "snapshot-count";
134    /// Reserved table property for current snapshot summary.
135    pub const PROPERTY_CURRENT_SNAPSHOT_SUMMARY: &str = "current-snapshot-summary";
136    /// Reserved table property for current snapshot id.
137    pub const PROPERTY_CURRENT_SNAPSHOT_ID: &str = "current-snapshot-id";
138    /// Reserved table property for current snapshot timestamp.
139    pub const PROPERTY_CURRENT_SNAPSHOT_TIMESTAMP: &str = "current-snapshot-timestamp-ms";
140    /// Reserved table property for the JSON representation of current schema.
141    pub const PROPERTY_CURRENT_SCHEMA: &str = "current-schema";
142    /// Reserved table property for the JSON representation of current(default) partition spec.
143    pub const PROPERTY_DEFAULT_PARTITION_SPEC: &str = "default-partition-spec";
144    /// Reserved table property for the JSON representation of current(default) sort order.
145    pub const PROPERTY_DEFAULT_SORT_ORDER: &str = "default-sort-order";
146
147    /// Property key for max number of previous versions to keep.
148    pub const PROPERTY_METADATA_PREVIOUS_VERSIONS_MAX: &str =
149        "write.metadata.previous-versions-max";
150    /// Default value for max number of previous versions to keep.
151    pub const PROPERTY_METADATA_PREVIOUS_VERSIONS_MAX_DEFAULT: usize = 100;
152
153    /// Property key for max number of partitions to keep summary stats for.
154    pub const PROPERTY_WRITE_PARTITION_SUMMARY_LIMIT: &str = "write.summary.partition-limit";
155    /// Default value for the max number of partitions to keep summary stats for.
156    pub const PROPERTY_WRITE_PARTITION_SUMMARY_LIMIT_DEFAULT: u64 = 0;
157
158    /// Reserved Iceberg table properties list.
159    ///
160    /// Reserved table properties are only used to control behaviors when creating or updating a
161    /// table. The value of these properties are not persisted as a part of the table metadata.
162    pub const RESERVED_PROPERTIES: [&str; 9] = [
163        Self::PROPERTY_FORMAT_VERSION,
164        Self::PROPERTY_UUID,
165        Self::PROPERTY_SNAPSHOT_COUNT,
166        Self::PROPERTY_CURRENT_SNAPSHOT_ID,
167        Self::PROPERTY_CURRENT_SNAPSHOT_SUMMARY,
168        Self::PROPERTY_CURRENT_SNAPSHOT_TIMESTAMP,
169        Self::PROPERTY_CURRENT_SCHEMA,
170        Self::PROPERTY_DEFAULT_PARTITION_SPEC,
171        Self::PROPERTY_DEFAULT_SORT_ORDER,
172    ];
173
174    /// Property key for number of commit retries.
175    pub const PROPERTY_COMMIT_NUM_RETRIES: &str = "commit.retry.num-retries";
176    /// Default value for number of commit retries.
177    pub const PROPERTY_COMMIT_NUM_RETRIES_DEFAULT: usize = 4;
178
179    /// Property key for minimum wait time (ms) between retries.
180    pub const PROPERTY_COMMIT_MIN_RETRY_WAIT_MS: &str = "commit.retry.min-wait-ms";
181    /// Default value for minimum wait time (ms) between retries.
182    pub const PROPERTY_COMMIT_MIN_RETRY_WAIT_MS_DEFAULT: u64 = 100;
183
184    /// Property key for maximum wait time (ms) between retries.
185    pub const PROPERTY_COMMIT_MAX_RETRY_WAIT_MS: &str = "commit.retry.max-wait-ms";
186    /// Default value for maximum wait time (ms) between retries.
187    pub const PROPERTY_COMMIT_MAX_RETRY_WAIT_MS_DEFAULT: u64 = 60 * 1000; // 1 minute
188
189    /// Property key for total maximum retry time (ms).
190    pub const PROPERTY_COMMIT_TOTAL_RETRY_TIME_MS: &str = "commit.retry.total-timeout-ms";
191    /// Default value for total maximum retry time (ms).
192    pub const PROPERTY_COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT: u64 = 30 * 60 * 1000; // 30 minutes
193
194    /// Default file format for data files
195    pub const PROPERTY_DEFAULT_FILE_FORMAT: &str = "write.format.default";
196    /// Default file format for delete files
197    pub const PROPERTY_DELETE_DEFAULT_FILE_FORMAT: &str = "write.delete.format.default";
198    /// Default value for data file format
199    pub const PROPERTY_DEFAULT_FILE_FORMAT_DEFAULT: &str = "parquet";
200
201    /// Target file size for newly written files.
202    pub const PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES: &str = "write.target-file-size-bytes";
203    /// Default target file size
204    pub const PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT: usize = 512 * 1024 * 1024; // 512 MB
205
206    /// Compression codec for metadata files (JSON)
207    pub const PROPERTY_METADATA_COMPRESSION_CODEC: &str = "write.metadata.compression-codec";
208    /// Default metadata compression codec - uncompressed
209    pub const PROPERTY_METADATA_COMPRESSION_CODEC_DEFAULT: &str = "none";
210    /// Whether to use `FanoutWriter` for partitioned tables (handles unsorted data).
211    /// If false, uses `ClusteredWriter` (requires sorted data, more memory efficient).
212    pub const PROPERTY_DATAFUSION_WRITE_FANOUT_ENABLED: &str = "write.datafusion.fanout.enabled";
213    /// Default value for fanout writer enabled
214    pub const PROPERTY_DATAFUSION_WRITE_FANOUT_ENABLED_DEFAULT: bool = true;
215}
216
217impl TryFrom<&HashMap<String, String>> for TableProperties {
218    // parse by entry key or use default value
219    type Error = Error;
220
221    fn try_from(props: &HashMap<String, String>) -> Result<Self> {
222        Ok(TableProperties {
223            commit_num_retries: parse_property(
224                props,
225                TableProperties::PROPERTY_COMMIT_NUM_RETRIES,
226                TableProperties::PROPERTY_COMMIT_NUM_RETRIES_DEFAULT,
227            )?,
228            commit_min_retry_wait_ms: parse_property(
229                props,
230                TableProperties::PROPERTY_COMMIT_MIN_RETRY_WAIT_MS,
231                TableProperties::PROPERTY_COMMIT_MIN_RETRY_WAIT_MS_DEFAULT,
232            )?,
233            commit_max_retry_wait_ms: parse_property(
234                props,
235                TableProperties::PROPERTY_COMMIT_MAX_RETRY_WAIT_MS,
236                TableProperties::PROPERTY_COMMIT_MAX_RETRY_WAIT_MS_DEFAULT,
237            )?,
238            commit_total_retry_timeout_ms: parse_property(
239                props,
240                TableProperties::PROPERTY_COMMIT_TOTAL_RETRY_TIME_MS,
241                TableProperties::PROPERTY_COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT,
242            )?,
243            write_format_default: parse_property(
244                props,
245                TableProperties::PROPERTY_DEFAULT_FILE_FORMAT,
246                TableProperties::PROPERTY_DEFAULT_FILE_FORMAT_DEFAULT.to_string(),
247            )?,
248            write_target_file_size_bytes: parse_property(
249                props,
250                TableProperties::PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES,
251                TableProperties::PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT,
252            )?,
253            metadata_compression_codec: parse_metadata_file_compression(props)?,
254            write_datafusion_fanout_enabled: parse_property(
255                props,
256                TableProperties::PROPERTY_DATAFUSION_WRITE_FANOUT_ENABLED,
257                TableProperties::PROPERTY_DATAFUSION_WRITE_FANOUT_ENABLED_DEFAULT,
258            )?,
259        })
260    }
261}
262
263#[cfg(test)]
264mod tests {
265    use super::*;
266    use crate::compression::CompressionCodec;
267
268    #[test]
269    fn test_table_properties_default() {
270        let props = HashMap::new();
271        let table_properties = TableProperties::try_from(&props).unwrap();
272        assert_eq!(
273            table_properties.commit_num_retries,
274            TableProperties::PROPERTY_COMMIT_NUM_RETRIES_DEFAULT
275        );
276        assert_eq!(
277            table_properties.commit_min_retry_wait_ms,
278            TableProperties::PROPERTY_COMMIT_MIN_RETRY_WAIT_MS_DEFAULT
279        );
280        assert_eq!(
281            table_properties.commit_max_retry_wait_ms,
282            TableProperties::PROPERTY_COMMIT_MAX_RETRY_WAIT_MS_DEFAULT
283        );
284        assert_eq!(
285            table_properties.write_format_default,
286            TableProperties::PROPERTY_DEFAULT_FILE_FORMAT_DEFAULT.to_string()
287        );
288        assert_eq!(
289            table_properties.write_target_file_size_bytes,
290            TableProperties::PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT
291        );
292        // Test compression defaults (none means CompressionCodec::None)
293        assert_eq!(
294            table_properties.metadata_compression_codec,
295            CompressionCodec::None
296        );
297    }
298
299    #[test]
300    fn test_table_properties_compression() {
301        let props = HashMap::from([(
302            TableProperties::PROPERTY_METADATA_COMPRESSION_CODEC.to_string(),
303            "gzip".to_string(),
304        )]);
305        let table_properties = TableProperties::try_from(&props).unwrap();
306        assert_eq!(
307            table_properties.metadata_compression_codec,
308            CompressionCodec::Gzip
309        );
310    }
311
312    #[test]
313    fn test_table_properties_compression_none() {
314        let props = HashMap::from([(
315            TableProperties::PROPERTY_METADATA_COMPRESSION_CODEC.to_string(),
316            "none".to_string(),
317        )]);
318        let table_properties = TableProperties::try_from(&props).unwrap();
319        assert_eq!(
320            table_properties.metadata_compression_codec,
321            CompressionCodec::None
322        );
323    }
324
325    #[test]
326    fn test_table_properties_compression_case_insensitive() {
327        // Test uppercase
328        let props_upper = HashMap::from([(
329            TableProperties::PROPERTY_METADATA_COMPRESSION_CODEC.to_string(),
330            "GZIP".to_string(),
331        )]);
332        let table_properties = TableProperties::try_from(&props_upper).unwrap();
333        assert_eq!(
334            table_properties.metadata_compression_codec,
335            CompressionCodec::Gzip
336        );
337
338        // Test mixed case
339        let props_mixed = HashMap::from([(
340            TableProperties::PROPERTY_METADATA_COMPRESSION_CODEC.to_string(),
341            "GzIp".to_string(),
342        )]);
343        let table_properties = TableProperties::try_from(&props_mixed).unwrap();
344        assert_eq!(
345            table_properties.metadata_compression_codec,
346            CompressionCodec::Gzip
347        );
348
349        // Test "NONE" should also be case-insensitive
350        let props_none_upper = HashMap::from([(
351            TableProperties::PROPERTY_METADATA_COMPRESSION_CODEC.to_string(),
352            "NONE".to_string(),
353        )]);
354        let table_properties = TableProperties::try_from(&props_none_upper).unwrap();
355        assert_eq!(
356            table_properties.metadata_compression_codec,
357            CompressionCodec::None
358        );
359    }
360
361    #[test]
362    fn test_table_properties_valid() {
363        let props = HashMap::from([
364            (
365                TableProperties::PROPERTY_COMMIT_NUM_RETRIES.to_string(),
366                "10".to_string(),
367            ),
368            (
369                TableProperties::PROPERTY_COMMIT_MAX_RETRY_WAIT_MS.to_string(),
370                "20".to_string(),
371            ),
372            (
373                TableProperties::PROPERTY_DEFAULT_FILE_FORMAT.to_string(),
374                "avro".to_string(),
375            ),
376            (
377                TableProperties::PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES.to_string(),
378                "512".to_string(),
379            ),
380        ]);
381        let table_properties = TableProperties::try_from(&props).unwrap();
382        assert_eq!(table_properties.commit_num_retries, 10);
383        assert_eq!(table_properties.commit_max_retry_wait_ms, 20);
384        assert_eq!(table_properties.write_format_default, "avro".to_string());
385        assert_eq!(table_properties.write_target_file_size_bytes, 512);
386    }
387
388    #[test]
389    fn test_table_properties_invalid() {
390        let invalid_retries = HashMap::from([(
391            TableProperties::PROPERTY_COMMIT_NUM_RETRIES.to_string(),
392            "abc".to_string(),
393        )]);
394
395        let table_properties = TableProperties::try_from(&invalid_retries).unwrap_err();
396        assert!(
397            table_properties.to_string().contains(
398                "Invalid value for commit.retry.num-retries: invalid digit found in string"
399            )
400        );
401
402        let invalid_min_wait = HashMap::from([(
403            TableProperties::PROPERTY_COMMIT_MIN_RETRY_WAIT_MS.to_string(),
404            "abc".to_string(),
405        )]);
406        let table_properties = TableProperties::try_from(&invalid_min_wait).unwrap_err();
407        assert!(
408            table_properties.to_string().contains(
409                "Invalid value for commit.retry.min-wait-ms: invalid digit found in string"
410            )
411        );
412
413        let invalid_max_wait = HashMap::from([(
414            TableProperties::PROPERTY_COMMIT_MAX_RETRY_WAIT_MS.to_string(),
415            "abc".to_string(),
416        )]);
417        let table_properties = TableProperties::try_from(&invalid_max_wait).unwrap_err();
418        assert!(
419            table_properties.to_string().contains(
420                "Invalid value for commit.retry.max-wait-ms: invalid digit found in string"
421            )
422        );
423
424        let invalid_target_size = HashMap::from([(
425            TableProperties::PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES.to_string(),
426            "abc".to_string(),
427        )]);
428        let table_properties = TableProperties::try_from(&invalid_target_size).unwrap_err();
429        assert!(table_properties.to_string().contains(
430            "Invalid value for write.target-file-size-bytes: invalid digit found in string"
431        ));
432    }
433
434    #[test]
435    fn test_table_properties_compression_invalid_rejected() {
436        let invalid_codecs = ["lz4", "zstd", "snappy"];
437
438        for codec in invalid_codecs {
439            let props = HashMap::from([(
440                TableProperties::PROPERTY_METADATA_COMPRESSION_CODEC.to_string(),
441                codec.to_string(),
442            )]);
443            let err = TableProperties::try_from(&props).unwrap_err();
444            let err_msg = err.to_string();
445            assert!(
446                err_msg.contains(&format!("Invalid metadata compression codec: {codec}")),
447                "Expected error message to contain codec '{codec}', got: {err_msg}"
448            );
449            assert!(
450                err_msg.contains("Only 'none' and 'gzip' are supported"),
451                "Expected error message to contain supported codecs, got: {err_msg}"
452            );
453        }
454    }
455
456    #[test]
457    fn test_parse_metadata_file_compression_valid() {
458        // Test with "none"
459        let props = HashMap::from([(
460            TableProperties::PROPERTY_METADATA_COMPRESSION_CODEC.to_string(),
461            "none".to_string(),
462        )]);
463        assert_eq!(
464            parse_metadata_file_compression(&props).unwrap(),
465            CompressionCodec::None
466        );
467
468        // Test with empty string
469        let props = HashMap::from([(
470            TableProperties::PROPERTY_METADATA_COMPRESSION_CODEC.to_string(),
471            "".to_string(),
472        )]);
473        assert_eq!(
474            parse_metadata_file_compression(&props).unwrap(),
475            CompressionCodec::None
476        );
477
478        // Test with "gzip"
479        let props = HashMap::from([(
480            TableProperties::PROPERTY_METADATA_COMPRESSION_CODEC.to_string(),
481            "gzip".to_string(),
482        )]);
483        assert_eq!(
484            parse_metadata_file_compression(&props).unwrap(),
485            CompressionCodec::Gzip
486        );
487
488        // Test case insensitivity - "NONE"
489        let props = HashMap::from([(
490            TableProperties::PROPERTY_METADATA_COMPRESSION_CODEC.to_string(),
491            "NONE".to_string(),
492        )]);
493        assert_eq!(
494            parse_metadata_file_compression(&props).unwrap(),
495            CompressionCodec::None
496        );
497
498        // Test case insensitivity - "GZIP"
499        let props = HashMap::from([(
500            TableProperties::PROPERTY_METADATA_COMPRESSION_CODEC.to_string(),
501            "GZIP".to_string(),
502        )]);
503        assert_eq!(
504            parse_metadata_file_compression(&props).unwrap(),
505            CompressionCodec::Gzip
506        );
507
508        // Test case insensitivity - "GzIp"
509        let props = HashMap::from([(
510            TableProperties::PROPERTY_METADATA_COMPRESSION_CODEC.to_string(),
511            "GzIp".to_string(),
512        )]);
513        assert_eq!(
514            parse_metadata_file_compression(&props).unwrap(),
515            CompressionCodec::Gzip
516        );
517
518        // Test default when property is missing
519        let props = HashMap::new();
520        assert_eq!(
521            parse_metadata_file_compression(&props).unwrap(),
522            CompressionCodec::None
523        );
524    }
525
526    #[test]
527    fn test_parse_metadata_file_compression_invalid() {
528        let invalid_codecs = ["lz4", "zstd", "snappy"];
529
530        for codec in invalid_codecs {
531            let props = HashMap::from([(
532                TableProperties::PROPERTY_METADATA_COMPRESSION_CODEC.to_string(),
533                codec.to_string(),
534            )]);
535            let err = parse_metadata_file_compression(&props).unwrap_err();
536            let err_msg = err.to_string();
537            assert!(
538                err_msg.contains("Invalid metadata compression codec"),
539                "Expected error message to contain 'Invalid metadata compression codec', got: {err_msg}"
540            );
541            assert!(
542                err_msg.contains("Only 'none' and 'gzip' are supported"),
543                "Expected error message to contain supported codecs, got: {err_msg}"
544            );
545        }
546    }
547}