1use std::collections::HashMap;
19use std::fmt::Display;
20use std::str::FromStr;
21
22use crate::compression::CompressionCodec;
23use crate::error::{Error, ErrorKind, Result};
24
25fn parse_property<T: FromStr>(
28 properties: &HashMap<String, String>,
29 key: &str,
30 default: T,
31) -> Result<T>
32where
33 <T as FromStr>::Err: Display,
34{
35 properties.get(key).map_or(Ok(default), |value| {
36 value.parse::<T>().map_err(|e| {
37 Error::new(
38 ErrorKind::DataInvalid,
39 format!("Invalid value for {key}: {e}"),
40 )
41 })
42 })
43}
44
45pub(crate) fn parse_metadata_file_compression(
58 properties: &HashMap<String, String>,
59) -> Result<CompressionCodec> {
60 let value = properties
61 .get(TableProperties::PROPERTY_METADATA_COMPRESSION_CODEC)
62 .map(|s| s.as_str())
63 .unwrap_or(TableProperties::PROPERTY_METADATA_COMPRESSION_CODEC_DEFAULT);
64
65 if value.is_empty() {
67 return Ok(CompressionCodec::None);
68 }
69
70 let lowercase_value = value.to_lowercase();
72
73 let codec: CompressionCodec = serde_json::from_value(serde_json::Value::String(
75 lowercase_value,
76 ))
77 .map_err(|_| {
78 Error::new(
79 ErrorKind::DataInvalid,
80 format!(
81 "Invalid metadata compression codec: {value}. Only '{}' and '{}' are supported.",
82 CompressionCodec::None.name(),
83 CompressionCodec::gzip_default().name()
84 ),
85 )
86 })?;
87
88 match codec {
90 CompressionCodec::None | CompressionCodec::Gzip(_) => Ok(codec),
91 _ => Err(Error::new(
92 ErrorKind::DataInvalid,
93 format!(
94 "Invalid metadata compression codec: {value}. Only '{}' and '{}' are supported for metadata files.",
95 CompressionCodec::None.name(),
96 CompressionCodec::gzip_default().name()
97 ),
98 )),
99 }
100}
101
102#[derive(Debug)]
104pub struct TableProperties {
105 pub commit_num_retries: usize,
107 pub commit_min_retry_wait_ms: u64,
109 pub commit_max_retry_wait_ms: u64,
111 pub commit_total_retry_timeout_ms: u64,
113 pub write_format_default: String,
115 pub write_target_file_size_bytes: usize,
117 pub metadata_compression_codec: CompressionCodec,
119 pub write_datafusion_fanout_enabled: bool,
121 pub gc_enabled: bool,
124}
125
126impl TableProperties {
127 pub const PROPERTY_FORMAT_VERSION: &str = "format-version";
137 pub const PROPERTY_UUID: &str = "uuid";
139 pub const PROPERTY_SNAPSHOT_COUNT: &str = "snapshot-count";
141 pub const PROPERTY_CURRENT_SNAPSHOT_SUMMARY: &str = "current-snapshot-summary";
143 pub const PROPERTY_CURRENT_SNAPSHOT_ID: &str = "current-snapshot-id";
145 pub const PROPERTY_CURRENT_SNAPSHOT_TIMESTAMP: &str = "current-snapshot-timestamp-ms";
147 pub const PROPERTY_CURRENT_SCHEMA: &str = "current-schema";
149 pub const PROPERTY_DEFAULT_PARTITION_SPEC: &str = "default-partition-spec";
151 pub const PROPERTY_DEFAULT_SORT_ORDER: &str = "default-sort-order";
153
154 pub const PROPERTY_METADATA_PREVIOUS_VERSIONS_MAX: &str =
156 "write.metadata.previous-versions-max";
157 pub const PROPERTY_METADATA_PREVIOUS_VERSIONS_MAX_DEFAULT: usize = 100;
159
160 pub const PROPERTY_WRITE_PARTITION_SUMMARY_LIMIT: &str = "write.summary.partition-limit";
162 pub const PROPERTY_WRITE_PARTITION_SUMMARY_LIMIT_DEFAULT: u64 = 0;
164
165 pub const RESERVED_PROPERTIES: [&str; 9] = [
170 Self::PROPERTY_FORMAT_VERSION,
171 Self::PROPERTY_UUID,
172 Self::PROPERTY_SNAPSHOT_COUNT,
173 Self::PROPERTY_CURRENT_SNAPSHOT_ID,
174 Self::PROPERTY_CURRENT_SNAPSHOT_SUMMARY,
175 Self::PROPERTY_CURRENT_SNAPSHOT_TIMESTAMP,
176 Self::PROPERTY_CURRENT_SCHEMA,
177 Self::PROPERTY_DEFAULT_PARTITION_SPEC,
178 Self::PROPERTY_DEFAULT_SORT_ORDER,
179 ];
180
181 pub const PROPERTY_COMMIT_NUM_RETRIES: &str = "commit.retry.num-retries";
183 pub const PROPERTY_COMMIT_NUM_RETRIES_DEFAULT: usize = 4;
185
186 pub const PROPERTY_COMMIT_MIN_RETRY_WAIT_MS: &str = "commit.retry.min-wait-ms";
188 pub const PROPERTY_COMMIT_MIN_RETRY_WAIT_MS_DEFAULT: u64 = 100;
190
191 pub const PROPERTY_COMMIT_MAX_RETRY_WAIT_MS: &str = "commit.retry.max-wait-ms";
193 pub const PROPERTY_COMMIT_MAX_RETRY_WAIT_MS_DEFAULT: u64 = 60 * 1000; pub const PROPERTY_COMMIT_TOTAL_RETRY_TIME_MS: &str = "commit.retry.total-timeout-ms";
198 pub const PROPERTY_COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT: u64 = 30 * 60 * 1000; pub const PROPERTY_DEFAULT_FILE_FORMAT: &str = "write.format.default";
203 pub const PROPERTY_DELETE_DEFAULT_FILE_FORMAT: &str = "write.delete.format.default";
205 pub const PROPERTY_DEFAULT_FILE_FORMAT_DEFAULT: &str = "parquet";
207
208 pub const PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES: &str = "write.target-file-size-bytes";
210 pub const PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT: usize = 512 * 1024 * 1024; pub const PROPERTY_METADATA_COMPRESSION_CODEC: &str = "write.metadata.compression-codec";
215 pub const PROPERTY_METADATA_COMPRESSION_CODEC_DEFAULT: &str = "none";
217 pub const PROPERTY_DATAFUSION_WRITE_FANOUT_ENABLED: &str = "write.datafusion.fanout.enabled";
220 pub const PROPERTY_DATAFUSION_WRITE_FANOUT_ENABLED_DEFAULT: bool = true;
222
223 pub const PROPERTY_GC_ENABLED: &str = "gc.enabled";
227 pub const PROPERTY_GC_ENABLED_DEFAULT: bool = true;
229}
230
231impl TryFrom<&HashMap<String, String>> for TableProperties {
232 type Error = Error;
234
235 fn try_from(props: &HashMap<String, String>) -> Result<Self> {
236 Ok(TableProperties {
237 commit_num_retries: parse_property(
238 props,
239 TableProperties::PROPERTY_COMMIT_NUM_RETRIES,
240 TableProperties::PROPERTY_COMMIT_NUM_RETRIES_DEFAULT,
241 )?,
242 commit_min_retry_wait_ms: parse_property(
243 props,
244 TableProperties::PROPERTY_COMMIT_MIN_RETRY_WAIT_MS,
245 TableProperties::PROPERTY_COMMIT_MIN_RETRY_WAIT_MS_DEFAULT,
246 )?,
247 commit_max_retry_wait_ms: parse_property(
248 props,
249 TableProperties::PROPERTY_COMMIT_MAX_RETRY_WAIT_MS,
250 TableProperties::PROPERTY_COMMIT_MAX_RETRY_WAIT_MS_DEFAULT,
251 )?,
252 commit_total_retry_timeout_ms: parse_property(
253 props,
254 TableProperties::PROPERTY_COMMIT_TOTAL_RETRY_TIME_MS,
255 TableProperties::PROPERTY_COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT,
256 )?,
257 write_format_default: parse_property(
258 props,
259 TableProperties::PROPERTY_DEFAULT_FILE_FORMAT,
260 TableProperties::PROPERTY_DEFAULT_FILE_FORMAT_DEFAULT.to_string(),
261 )?,
262 write_target_file_size_bytes: parse_property(
263 props,
264 TableProperties::PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES,
265 TableProperties::PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT,
266 )?,
267 metadata_compression_codec: parse_metadata_file_compression(props)?,
268 write_datafusion_fanout_enabled: parse_property(
269 props,
270 TableProperties::PROPERTY_DATAFUSION_WRITE_FANOUT_ENABLED,
271 TableProperties::PROPERTY_DATAFUSION_WRITE_FANOUT_ENABLED_DEFAULT,
272 )?,
273 gc_enabled: parse_property(
274 props,
275 TableProperties::PROPERTY_GC_ENABLED,
276 TableProperties::PROPERTY_GC_ENABLED_DEFAULT,
277 )?,
278 })
279 }
280}
281
282#[cfg(test)]
283mod tests {
284 use super::*;
285 use crate::compression::CompressionCodec;
286
287 #[test]
288 fn test_table_properties_default() {
289 let props = HashMap::new();
290 let table_properties = TableProperties::try_from(&props).unwrap();
291 assert_eq!(
292 table_properties.commit_num_retries,
293 TableProperties::PROPERTY_COMMIT_NUM_RETRIES_DEFAULT
294 );
295 assert_eq!(
296 table_properties.commit_min_retry_wait_ms,
297 TableProperties::PROPERTY_COMMIT_MIN_RETRY_WAIT_MS_DEFAULT
298 );
299 assert_eq!(
300 table_properties.commit_max_retry_wait_ms,
301 TableProperties::PROPERTY_COMMIT_MAX_RETRY_WAIT_MS_DEFAULT
302 );
303 assert_eq!(
304 table_properties.write_format_default,
305 TableProperties::PROPERTY_DEFAULT_FILE_FORMAT_DEFAULT.to_string()
306 );
307 assert_eq!(
308 table_properties.write_target_file_size_bytes,
309 TableProperties::PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT
310 );
311 assert_eq!(
313 table_properties.metadata_compression_codec,
314 CompressionCodec::None
315 );
316 assert_eq!(
317 table_properties.gc_enabled,
318 TableProperties::PROPERTY_GC_ENABLED_DEFAULT
319 );
320 }
321
322 #[test]
323 fn test_table_properties_compression() {
324 let props = HashMap::from([(
325 TableProperties::PROPERTY_METADATA_COMPRESSION_CODEC.to_string(),
326 "gzip".to_string(),
327 )]);
328 let table_properties = TableProperties::try_from(&props).unwrap();
329 assert_eq!(
330 table_properties.metadata_compression_codec,
331 CompressionCodec::gzip_default()
332 );
333 }
334
335 #[test]
336 fn test_table_properties_compression_none() {
337 let props = HashMap::from([(
338 TableProperties::PROPERTY_METADATA_COMPRESSION_CODEC.to_string(),
339 "none".to_string(),
340 )]);
341 let table_properties = TableProperties::try_from(&props).unwrap();
342 assert_eq!(
343 table_properties.metadata_compression_codec,
344 CompressionCodec::None
345 );
346 }
347
348 #[test]
349 fn test_table_properties_compression_case_insensitive() {
350 let props_upper = HashMap::from([(
352 TableProperties::PROPERTY_METADATA_COMPRESSION_CODEC.to_string(),
353 "GZIP".to_string(),
354 )]);
355 let table_properties = TableProperties::try_from(&props_upper).unwrap();
356 assert_eq!(
357 table_properties.metadata_compression_codec,
358 CompressionCodec::gzip_default()
359 );
360
361 let props_mixed = HashMap::from([(
363 TableProperties::PROPERTY_METADATA_COMPRESSION_CODEC.to_string(),
364 "GzIp".to_string(),
365 )]);
366 let table_properties = TableProperties::try_from(&props_mixed).unwrap();
367 assert_eq!(
368 table_properties.metadata_compression_codec,
369 CompressionCodec::gzip_default()
370 );
371
372 let props_none_upper = HashMap::from([(
374 TableProperties::PROPERTY_METADATA_COMPRESSION_CODEC.to_string(),
375 "NONE".to_string(),
376 )]);
377 let table_properties = TableProperties::try_from(&props_none_upper).unwrap();
378 assert_eq!(
379 table_properties.metadata_compression_codec,
380 CompressionCodec::None
381 );
382 }
383
384 #[test]
385 fn test_table_properties_valid() {
386 let props = HashMap::from([
387 (
388 TableProperties::PROPERTY_COMMIT_NUM_RETRIES.to_string(),
389 "10".to_string(),
390 ),
391 (
392 TableProperties::PROPERTY_COMMIT_MAX_RETRY_WAIT_MS.to_string(),
393 "20".to_string(),
394 ),
395 (
396 TableProperties::PROPERTY_DEFAULT_FILE_FORMAT.to_string(),
397 "avro".to_string(),
398 ),
399 (
400 TableProperties::PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES.to_string(),
401 "512".to_string(),
402 ),
403 (
404 TableProperties::PROPERTY_GC_ENABLED.to_string(),
405 "false".to_string(),
406 ),
407 ]);
408 let table_properties = TableProperties::try_from(&props).unwrap();
409 assert_eq!(table_properties.commit_num_retries, 10);
410 assert_eq!(table_properties.commit_max_retry_wait_ms, 20);
411 assert_eq!(table_properties.write_format_default, "avro".to_string());
412 assert_eq!(table_properties.write_target_file_size_bytes, 512);
413 assert!(!table_properties.gc_enabled);
414 }
415
416 #[test]
417 fn test_table_properties_invalid() {
418 let invalid_retries = HashMap::from([(
419 TableProperties::PROPERTY_COMMIT_NUM_RETRIES.to_string(),
420 "abc".to_string(),
421 )]);
422
423 let table_properties = TableProperties::try_from(&invalid_retries).unwrap_err();
424 assert!(
425 table_properties.to_string().contains(
426 "Invalid value for commit.retry.num-retries: invalid digit found in string"
427 )
428 );
429
430 let invalid_min_wait = HashMap::from([(
431 TableProperties::PROPERTY_COMMIT_MIN_RETRY_WAIT_MS.to_string(),
432 "abc".to_string(),
433 )]);
434 let table_properties = TableProperties::try_from(&invalid_min_wait).unwrap_err();
435 assert!(
436 table_properties.to_string().contains(
437 "Invalid value for commit.retry.min-wait-ms: invalid digit found in string"
438 )
439 );
440
441 let invalid_max_wait = HashMap::from([(
442 TableProperties::PROPERTY_COMMIT_MAX_RETRY_WAIT_MS.to_string(),
443 "abc".to_string(),
444 )]);
445 let table_properties = TableProperties::try_from(&invalid_max_wait).unwrap_err();
446 assert!(
447 table_properties.to_string().contains(
448 "Invalid value for commit.retry.max-wait-ms: invalid digit found in string"
449 )
450 );
451
452 let invalid_target_size = HashMap::from([(
453 TableProperties::PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES.to_string(),
454 "abc".to_string(),
455 )]);
456 let table_properties = TableProperties::try_from(&invalid_target_size).unwrap_err();
457 assert!(table_properties.to_string().contains(
458 "Invalid value for write.target-file-size-bytes: invalid digit found in string"
459 ));
460
461 let invalid_gc_enabled = HashMap::from([(
462 TableProperties::PROPERTY_GC_ENABLED.to_string(),
463 "notabool".to_string(),
464 )]);
465 let table_properties = TableProperties::try_from(&invalid_gc_enabled).unwrap_err();
466 assert!(
467 table_properties
468 .to_string()
469 .contains("Invalid value for gc.enabled")
470 );
471 }
472
473 #[test]
474 fn test_table_properties_compression_invalid_rejected() {
475 let invalid_codecs = ["lz4", "zstd", "snappy"];
476
477 for codec in invalid_codecs {
478 let props = HashMap::from([(
479 TableProperties::PROPERTY_METADATA_COMPRESSION_CODEC.to_string(),
480 codec.to_string(),
481 )]);
482 let err = TableProperties::try_from(&props).unwrap_err();
483 let err_msg = err.to_string();
484 assert!(
485 err_msg.contains(&format!("Invalid metadata compression codec: {codec}")),
486 "Expected error message to contain codec '{codec}', got: {err_msg}"
487 );
488 assert!(
489 err_msg.contains("Only 'none' and 'gzip' are supported"),
490 "Expected error message to contain supported codecs, got: {err_msg}"
491 );
492 }
493 }
494
495 #[test]
496 fn test_parse_metadata_file_compression_valid() {
497 let props = HashMap::from([(
499 TableProperties::PROPERTY_METADATA_COMPRESSION_CODEC.to_string(),
500 "none".to_string(),
501 )]);
502 assert_eq!(
503 parse_metadata_file_compression(&props).unwrap(),
504 CompressionCodec::None
505 );
506
507 let props = HashMap::from([(
509 TableProperties::PROPERTY_METADATA_COMPRESSION_CODEC.to_string(),
510 "".to_string(),
511 )]);
512 assert_eq!(
513 parse_metadata_file_compression(&props).unwrap(),
514 CompressionCodec::None
515 );
516
517 let props = HashMap::from([(
519 TableProperties::PROPERTY_METADATA_COMPRESSION_CODEC.to_string(),
520 "gzip".to_string(),
521 )]);
522 assert_eq!(
523 parse_metadata_file_compression(&props).unwrap(),
524 CompressionCodec::gzip_default()
525 );
526
527 let props = HashMap::from([(
529 TableProperties::PROPERTY_METADATA_COMPRESSION_CODEC.to_string(),
530 "NONE".to_string(),
531 )]);
532 assert_eq!(
533 parse_metadata_file_compression(&props).unwrap(),
534 CompressionCodec::None
535 );
536
537 let props = HashMap::from([(
539 TableProperties::PROPERTY_METADATA_COMPRESSION_CODEC.to_string(),
540 "GZIP".to_string(),
541 )]);
542 assert_eq!(
543 parse_metadata_file_compression(&props).unwrap(),
544 CompressionCodec::gzip_default()
545 );
546
547 let props = HashMap::from([(
549 TableProperties::PROPERTY_METADATA_COMPRESSION_CODEC.to_string(),
550 "GzIp".to_string(),
551 )]);
552 assert_eq!(
553 parse_metadata_file_compression(&props).unwrap(),
554 CompressionCodec::gzip_default()
555 );
556
557 let props = HashMap::new();
559 assert_eq!(
560 parse_metadata_file_compression(&props).unwrap(),
561 CompressionCodec::None
562 );
563 }
564
565 #[test]
566 fn test_parse_metadata_file_compression_invalid() {
567 let invalid_codecs = ["lz4", "zstd", "snappy"];
568
569 for codec in invalid_codecs {
570 let props = HashMap::from([(
571 TableProperties::PROPERTY_METADATA_COMPRESSION_CODEC.to_string(),
572 codec.to_string(),
573 )]);
574 let err = parse_metadata_file_compression(&props).unwrap_err();
575 let err_msg = err.to_string();
576 assert!(
577 err_msg.contains("Invalid metadata compression codec"),
578 "Expected error message to contain 'Invalid metadata compression codec', got: {err_msg}"
579 );
580 assert!(
581 err_msg.contains("Only 'none' and 'gzip' are supported"),
582 "Expected error message to contain supported codecs, got: {err_msg}"
583 );
584 }
585 }
586}