1use std::collections::HashMap;
19use std::fmt::Display;
20use std::str::FromStr;
21
22use crate::compression::CompressionCodec;
23use crate::error::{Error, ErrorKind, Result};
24
25fn parse_property<T: FromStr>(
26 properties: &HashMap<String, String>,
27 key: &str,
28 default: T,
29) -> Result<T>
30where
31 <T as FromStr>::Err: Display,
32{
33 properties.get(key).map_or(Ok(default), |value| {
34 value.parse::<T>().map_err(|e| {
35 Error::new(
36 ErrorKind::DataInvalid,
37 format!("Invalid value for {key}: {e}"),
38 )
39 })
40 })
41}
42
43pub(crate) fn parse_metadata_file_compression(
56 properties: &HashMap<String, String>,
57) -> Result<CompressionCodec> {
58 let value = properties
59 .get(TableProperties::PROPERTY_METADATA_COMPRESSION_CODEC)
60 .map(|s| s.as_str())
61 .unwrap_or(TableProperties::PROPERTY_METADATA_COMPRESSION_CODEC_DEFAULT);
62
63 if value.is_empty() {
65 return Ok(CompressionCodec::None);
66 }
67
68 let lowercase_value = value.to_lowercase();
70
71 let codec: CompressionCodec = serde_json::from_value(serde_json::Value::String(
73 lowercase_value,
74 ))
75 .map_err(|_| {
76 Error::new(
77 ErrorKind::DataInvalid,
78 format!(
79 "Invalid metadata compression codec: {value}. Only '{}' and '{}' are supported.",
80 CompressionCodec::None.name(),
81 CompressionCodec::gzip_default().name()
82 ),
83 )
84 })?;
85
86 match codec {
88 CompressionCodec::None | CompressionCodec::Gzip(_) => Ok(codec),
89 _ => Err(Error::new(
90 ErrorKind::DataInvalid,
91 format!(
92 "Invalid metadata compression codec: {value}. Only '{}' and '{}' are supported for metadata files.",
93 CompressionCodec::None.name(),
94 CompressionCodec::gzip_default().name()
95 ),
96 )),
97 }
98}
99
100#[derive(Debug)]
102pub struct TableProperties {
103 pub commit_num_retries: usize,
105 pub commit_min_retry_wait_ms: u64,
107 pub commit_max_retry_wait_ms: u64,
109 pub commit_total_retry_timeout_ms: u64,
111 pub write_format_default: String,
113 pub write_target_file_size_bytes: usize,
115 pub metadata_compression_codec: CompressionCodec,
117 pub write_datafusion_fanout_enabled: bool,
119 pub gc_enabled: bool,
122 pub max_snapshot_age_ms: i64,
124 pub min_snapshots_to_keep: usize,
126 pub max_ref_age_ms: i64,
128 pub cdc_enabled: bool,
131 pub cdc_min_chunk_size: usize,
133 pub cdc_max_chunk_size: usize,
135 pub cdc_norm_level: i32,
137 pub encryption_key_id: Option<String>,
140 pub encryption_data_key_length: usize,
142}
143
144impl TableProperties {
145 pub const PROPERTY_FORMAT_VERSION: &str = "format-version";
155 pub const PROPERTY_UUID: &str = "uuid";
157 pub const PROPERTY_SNAPSHOT_COUNT: &str = "snapshot-count";
159 pub const PROPERTY_CURRENT_SNAPSHOT_SUMMARY: &str = "current-snapshot-summary";
161 pub const PROPERTY_CURRENT_SNAPSHOT_ID: &str = "current-snapshot-id";
163 pub const PROPERTY_CURRENT_SNAPSHOT_TIMESTAMP: &str = "current-snapshot-timestamp-ms";
165 pub const PROPERTY_CURRENT_SCHEMA: &str = "current-schema";
167 pub const PROPERTY_DEFAULT_PARTITION_SPEC: &str = "default-partition-spec";
169 pub const PROPERTY_DEFAULT_SORT_ORDER: &str = "default-sort-order";
171
172 pub const PROPERTY_METADATA_PREVIOUS_VERSIONS_MAX: &str =
174 "write.metadata.previous-versions-max";
175 pub const PROPERTY_METADATA_PREVIOUS_VERSIONS_MAX_DEFAULT: usize = 100;
177
178 pub const PROPERTY_WRITE_PARTITION_SUMMARY_LIMIT: &str = "write.summary.partition-limit";
180 pub const PROPERTY_WRITE_PARTITION_SUMMARY_LIMIT_DEFAULT: u64 = 0;
182
183 pub const RESERVED_PROPERTIES: [&str; 9] = [
188 Self::PROPERTY_FORMAT_VERSION,
189 Self::PROPERTY_UUID,
190 Self::PROPERTY_SNAPSHOT_COUNT,
191 Self::PROPERTY_CURRENT_SNAPSHOT_ID,
192 Self::PROPERTY_CURRENT_SNAPSHOT_SUMMARY,
193 Self::PROPERTY_CURRENT_SNAPSHOT_TIMESTAMP,
194 Self::PROPERTY_CURRENT_SCHEMA,
195 Self::PROPERTY_DEFAULT_PARTITION_SPEC,
196 Self::PROPERTY_DEFAULT_SORT_ORDER,
197 ];
198
199 pub const PROPERTY_COMMIT_NUM_RETRIES: &str = "commit.retry.num-retries";
201 pub const PROPERTY_COMMIT_NUM_RETRIES_DEFAULT: usize = 4;
203
204 pub const PROPERTY_COMMIT_MIN_RETRY_WAIT_MS: &str = "commit.retry.min-wait-ms";
206 pub const PROPERTY_COMMIT_MIN_RETRY_WAIT_MS_DEFAULT: u64 = 100;
208
209 pub const PROPERTY_COMMIT_MAX_RETRY_WAIT_MS: &str = "commit.retry.max-wait-ms";
211 pub const PROPERTY_COMMIT_MAX_RETRY_WAIT_MS_DEFAULT: u64 = 60 * 1000; pub const PROPERTY_COMMIT_TOTAL_RETRY_TIME_MS: &str = "commit.retry.total-timeout-ms";
216 pub const PROPERTY_COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT: u64 = 30 * 60 * 1000; pub const PROPERTY_DEFAULT_FILE_FORMAT: &str = "write.format.default";
221 pub const PROPERTY_DELETE_DEFAULT_FILE_FORMAT: &str = "write.delete.format.default";
223 pub const PROPERTY_DEFAULT_FILE_FORMAT_DEFAULT: &str = "parquet";
225
226 pub const PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES: &str = "write.target-file-size-bytes";
228 pub const PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT: usize = 512 * 1024 * 1024; pub const PROPERTY_METADATA_COMPRESSION_CODEC: &str = "write.metadata.compression-codec";
233 pub const PROPERTY_METADATA_COMPRESSION_CODEC_DEFAULT: &str = "none";
235 pub const PROPERTY_DATAFUSION_WRITE_FANOUT_ENABLED: &str = "write.datafusion.fanout.enabled";
238 pub const PROPERTY_DATAFUSION_WRITE_FANOUT_ENABLED_DEFAULT: bool = true;
240
241 pub const PROPERTY_GC_ENABLED: &str = "gc.enabled";
245 pub const PROPERTY_GC_ENABLED_DEFAULT: bool = true;
247
248 pub const PROPERTY_MAX_SNAPSHOT_AGE_MS: &str = "history.expire.max-snapshot-age-ms";
250 pub const PROPERTY_MAX_SNAPSHOT_AGE_MS_DEFAULT: i64 = 5 * 24 * 60 * 60 * 1000;
252 pub const PROPERTY_MIN_SNAPSHOTS_TO_KEEP: &str = "history.expire.min-snapshots-to-keep";
254 pub const PROPERTY_MIN_SNAPSHOTS_TO_KEEP_DEFAULT: usize = 1;
256 pub const PROPERTY_MAX_REF_AGE_MS: &str = "history.expire.max-ref-age-ms";
258 pub const PROPERTY_MAX_REF_AGE_MS_DEFAULT: i64 = i64::MAX;
260
261 pub const PROPERTY_PARQUET_CDC_ENABLED: &str = "write.parquet.content-defined-chunking.enabled";
263 pub const PROPERTY_PARQUET_CDC_ENABLED_DEFAULT: bool = false;
265 pub const PROPERTY_PARQUET_CDC_MIN_CHUNK_SIZE: &str =
267 "write.parquet.content-defined-chunking.min-chunk-size";
268 pub const PROPERTY_PARQUET_CDC_MIN_CHUNK_SIZE_DEFAULT: usize = 256 * 1024;
270 pub const PROPERTY_PARQUET_CDC_MAX_CHUNK_SIZE: &str =
272 "write.parquet.content-defined-chunking.max-chunk-size";
273 pub const PROPERTY_PARQUET_CDC_MAX_CHUNK_SIZE_DEFAULT: usize = 1024 * 1024;
275 pub const PROPERTY_PARQUET_CDC_NORM_LEVEL: &str =
277 "write.parquet.content-defined-chunking.norm-level";
278 pub const PROPERTY_PARQUET_CDC_NORM_LEVEL_DEFAULT: i32 = 0;
280
281 pub const PROPERTY_ENCRYPTION_KEY_ID: &str = "encryption.key-id";
284
285 pub const PROPERTY_ENCRYPTION_DATA_KEY_LENGTH: &str = "encryption.data-key-length";
287 pub const PROPERTY_ENCRYPTION_DATA_KEY_LENGTH_DEFAULT: usize = 16;
289}
290
291impl TryFrom<&HashMap<String, String>> for TableProperties {
292 type Error = Error;
294
295 fn try_from(props: &HashMap<String, String>) -> Result<Self> {
296 Ok(TableProperties {
297 commit_num_retries: parse_property(
298 props,
299 TableProperties::PROPERTY_COMMIT_NUM_RETRIES,
300 TableProperties::PROPERTY_COMMIT_NUM_RETRIES_DEFAULT,
301 )?,
302 commit_min_retry_wait_ms: parse_property(
303 props,
304 TableProperties::PROPERTY_COMMIT_MIN_RETRY_WAIT_MS,
305 TableProperties::PROPERTY_COMMIT_MIN_RETRY_WAIT_MS_DEFAULT,
306 )?,
307 commit_max_retry_wait_ms: parse_property(
308 props,
309 TableProperties::PROPERTY_COMMIT_MAX_RETRY_WAIT_MS,
310 TableProperties::PROPERTY_COMMIT_MAX_RETRY_WAIT_MS_DEFAULT,
311 )?,
312 commit_total_retry_timeout_ms: parse_property(
313 props,
314 TableProperties::PROPERTY_COMMIT_TOTAL_RETRY_TIME_MS,
315 TableProperties::PROPERTY_COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT,
316 )?,
317 write_format_default: parse_property(
318 props,
319 TableProperties::PROPERTY_DEFAULT_FILE_FORMAT,
320 TableProperties::PROPERTY_DEFAULT_FILE_FORMAT_DEFAULT.to_string(),
321 )?,
322 write_target_file_size_bytes: parse_property(
323 props,
324 TableProperties::PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES,
325 TableProperties::PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT,
326 )?,
327 metadata_compression_codec: parse_metadata_file_compression(props)?,
328 write_datafusion_fanout_enabled: parse_property(
329 props,
330 TableProperties::PROPERTY_DATAFUSION_WRITE_FANOUT_ENABLED,
331 TableProperties::PROPERTY_DATAFUSION_WRITE_FANOUT_ENABLED_DEFAULT,
332 )?,
333 gc_enabled: parse_property(
334 props,
335 TableProperties::PROPERTY_GC_ENABLED,
336 TableProperties::PROPERTY_GC_ENABLED_DEFAULT,
337 )?,
338 max_snapshot_age_ms: parse_property(
339 props,
340 TableProperties::PROPERTY_MAX_SNAPSHOT_AGE_MS,
341 TableProperties::PROPERTY_MAX_SNAPSHOT_AGE_MS_DEFAULT,
342 )?,
343 min_snapshots_to_keep: parse_property(
344 props,
345 TableProperties::PROPERTY_MIN_SNAPSHOTS_TO_KEEP,
346 TableProperties::PROPERTY_MIN_SNAPSHOTS_TO_KEEP_DEFAULT,
347 )?,
348 max_ref_age_ms: parse_property(
349 props,
350 TableProperties::PROPERTY_MAX_REF_AGE_MS,
351 TableProperties::PROPERTY_MAX_REF_AGE_MS_DEFAULT,
352 )?,
353 cdc_enabled: parse_property(
354 props,
355 TableProperties::PROPERTY_PARQUET_CDC_ENABLED,
356 TableProperties::PROPERTY_PARQUET_CDC_ENABLED_DEFAULT,
357 )?,
358 cdc_min_chunk_size: parse_property(
359 props,
360 TableProperties::PROPERTY_PARQUET_CDC_MIN_CHUNK_SIZE,
361 TableProperties::PROPERTY_PARQUET_CDC_MIN_CHUNK_SIZE_DEFAULT,
362 )?,
363 cdc_max_chunk_size: parse_property(
364 props,
365 TableProperties::PROPERTY_PARQUET_CDC_MAX_CHUNK_SIZE,
366 TableProperties::PROPERTY_PARQUET_CDC_MAX_CHUNK_SIZE_DEFAULT,
367 )?,
368 cdc_norm_level: parse_property(
369 props,
370 TableProperties::PROPERTY_PARQUET_CDC_NORM_LEVEL,
371 TableProperties::PROPERTY_PARQUET_CDC_NORM_LEVEL_DEFAULT,
372 )?,
373 encryption_key_id: props
374 .get(TableProperties::PROPERTY_ENCRYPTION_KEY_ID)
375 .cloned(),
376 encryption_data_key_length: parse_property(
377 props,
378 TableProperties::PROPERTY_ENCRYPTION_DATA_KEY_LENGTH,
379 TableProperties::PROPERTY_ENCRYPTION_DATA_KEY_LENGTH_DEFAULT,
380 )?,
381 })
382 }
383}
384
385#[cfg(test)]
386mod tests {
387 use super::*;
388 use crate::compression::CompressionCodec;
389
390 #[test]
391 fn test_table_properties_default() {
392 let props = HashMap::new();
393 let table_properties = TableProperties::try_from(&props).unwrap();
394 assert_eq!(
395 table_properties.commit_num_retries,
396 TableProperties::PROPERTY_COMMIT_NUM_RETRIES_DEFAULT
397 );
398 assert_eq!(
399 table_properties.commit_min_retry_wait_ms,
400 TableProperties::PROPERTY_COMMIT_MIN_RETRY_WAIT_MS_DEFAULT
401 );
402 assert_eq!(
403 table_properties.commit_max_retry_wait_ms,
404 TableProperties::PROPERTY_COMMIT_MAX_RETRY_WAIT_MS_DEFAULT
405 );
406 assert_eq!(
407 table_properties.write_format_default,
408 TableProperties::PROPERTY_DEFAULT_FILE_FORMAT_DEFAULT.to_string()
409 );
410 assert_eq!(
411 table_properties.write_target_file_size_bytes,
412 TableProperties::PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT
413 );
414 assert_eq!(
416 table_properties.metadata_compression_codec,
417 CompressionCodec::None
418 );
419 assert_eq!(
420 table_properties.gc_enabled,
421 TableProperties::PROPERTY_GC_ENABLED_DEFAULT
422 );
423 assert_eq!(
424 table_properties.max_snapshot_age_ms,
425 TableProperties::PROPERTY_MAX_SNAPSHOT_AGE_MS_DEFAULT
426 );
427 assert_eq!(
428 table_properties.min_snapshots_to_keep,
429 TableProperties::PROPERTY_MIN_SNAPSHOTS_TO_KEEP_DEFAULT
430 );
431 assert_eq!(
432 table_properties.max_ref_age_ms,
433 TableProperties::PROPERTY_MAX_REF_AGE_MS_DEFAULT
434 );
435 }
436
437 #[test]
438 fn test_table_properties_history_expire_overrides() {
439 let props = HashMap::from([
440 (
441 TableProperties::PROPERTY_MAX_SNAPSHOT_AGE_MS.to_string(),
442 "1234".to_string(),
443 ),
444 (
445 TableProperties::PROPERTY_MIN_SNAPSHOTS_TO_KEEP.to_string(),
446 "7".to_string(),
447 ),
448 (
449 TableProperties::PROPERTY_MAX_REF_AGE_MS.to_string(),
450 "5678".to_string(),
451 ),
452 ]);
453 let table_properties = TableProperties::try_from(&props).unwrap();
454 assert_eq!(table_properties.max_snapshot_age_ms, 1234);
455 assert_eq!(table_properties.min_snapshots_to_keep, 7);
456 assert_eq!(table_properties.max_ref_age_ms, 5678);
457 }
458
459 #[test]
460 fn test_table_properties_compression() {
461 let props = HashMap::from([(
462 TableProperties::PROPERTY_METADATA_COMPRESSION_CODEC.to_string(),
463 "gzip".to_string(),
464 )]);
465 let table_properties = TableProperties::try_from(&props).unwrap();
466 assert_eq!(
467 table_properties.metadata_compression_codec,
468 CompressionCodec::gzip_default()
469 );
470 }
471
472 #[test]
473 fn test_table_properties_compression_none() {
474 let props = HashMap::from([(
475 TableProperties::PROPERTY_METADATA_COMPRESSION_CODEC.to_string(),
476 "none".to_string(),
477 )]);
478 let table_properties = TableProperties::try_from(&props).unwrap();
479 assert_eq!(
480 table_properties.metadata_compression_codec,
481 CompressionCodec::None
482 );
483 }
484
485 #[test]
486 fn test_table_properties_compression_case_insensitive() {
487 let props_upper = HashMap::from([(
489 TableProperties::PROPERTY_METADATA_COMPRESSION_CODEC.to_string(),
490 "GZIP".to_string(),
491 )]);
492 let table_properties = TableProperties::try_from(&props_upper).unwrap();
493 assert_eq!(
494 table_properties.metadata_compression_codec,
495 CompressionCodec::gzip_default()
496 );
497
498 let props_mixed = HashMap::from([(
500 TableProperties::PROPERTY_METADATA_COMPRESSION_CODEC.to_string(),
501 "GzIp".to_string(),
502 )]);
503 let table_properties = TableProperties::try_from(&props_mixed).unwrap();
504 assert_eq!(
505 table_properties.metadata_compression_codec,
506 CompressionCodec::gzip_default()
507 );
508
509 let props_none_upper = HashMap::from([(
511 TableProperties::PROPERTY_METADATA_COMPRESSION_CODEC.to_string(),
512 "NONE".to_string(),
513 )]);
514 let table_properties = TableProperties::try_from(&props_none_upper).unwrap();
515 assert_eq!(
516 table_properties.metadata_compression_codec,
517 CompressionCodec::None
518 );
519 }
520
521 #[test]
522 fn test_table_properties_valid() {
523 let props = HashMap::from([
524 (
525 TableProperties::PROPERTY_COMMIT_NUM_RETRIES.to_string(),
526 "10".to_string(),
527 ),
528 (
529 TableProperties::PROPERTY_COMMIT_MAX_RETRY_WAIT_MS.to_string(),
530 "20".to_string(),
531 ),
532 (
533 TableProperties::PROPERTY_DEFAULT_FILE_FORMAT.to_string(),
534 "avro".to_string(),
535 ),
536 (
537 TableProperties::PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES.to_string(),
538 "512".to_string(),
539 ),
540 (
541 TableProperties::PROPERTY_GC_ENABLED.to_string(),
542 "false".to_string(),
543 ),
544 ]);
545 let table_properties = TableProperties::try_from(&props).unwrap();
546 assert_eq!(table_properties.commit_num_retries, 10);
547 assert_eq!(table_properties.commit_max_retry_wait_ms, 20);
548 assert_eq!(table_properties.write_format_default, "avro".to_string());
549 assert_eq!(table_properties.write_target_file_size_bytes, 512);
550 assert!(!table_properties.gc_enabled);
551 }
552
553 #[test]
554 fn test_table_properties_invalid() {
555 let invalid_retries = HashMap::from([(
556 TableProperties::PROPERTY_COMMIT_NUM_RETRIES.to_string(),
557 "abc".to_string(),
558 )]);
559
560 let table_properties = TableProperties::try_from(&invalid_retries).unwrap_err();
561 assert!(
562 table_properties.to_string().contains(
563 "Invalid value for commit.retry.num-retries: invalid digit found in string"
564 )
565 );
566
567 let invalid_min_wait = HashMap::from([(
568 TableProperties::PROPERTY_COMMIT_MIN_RETRY_WAIT_MS.to_string(),
569 "abc".to_string(),
570 )]);
571 let table_properties = TableProperties::try_from(&invalid_min_wait).unwrap_err();
572 assert!(
573 table_properties.to_string().contains(
574 "Invalid value for commit.retry.min-wait-ms: invalid digit found in string"
575 )
576 );
577
578 let invalid_max_wait = HashMap::from([(
579 TableProperties::PROPERTY_COMMIT_MAX_RETRY_WAIT_MS.to_string(),
580 "abc".to_string(),
581 )]);
582 let table_properties = TableProperties::try_from(&invalid_max_wait).unwrap_err();
583 assert!(
584 table_properties.to_string().contains(
585 "Invalid value for commit.retry.max-wait-ms: invalid digit found in string"
586 )
587 );
588
589 let invalid_target_size = HashMap::from([(
590 TableProperties::PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES.to_string(),
591 "abc".to_string(),
592 )]);
593 let table_properties = TableProperties::try_from(&invalid_target_size).unwrap_err();
594 assert!(table_properties.to_string().contains(
595 "Invalid value for write.target-file-size-bytes: invalid digit found in string"
596 ));
597
598 let invalid_gc_enabled = HashMap::from([(
599 TableProperties::PROPERTY_GC_ENABLED.to_string(),
600 "notabool".to_string(),
601 )]);
602 let table_properties = TableProperties::try_from(&invalid_gc_enabled).unwrap_err();
603 assert!(
604 table_properties
605 .to_string()
606 .contains("Invalid value for gc.enabled")
607 );
608 }
609
610 #[test]
611 fn test_table_properties_compression_invalid_rejected() {
612 let invalid_codecs = ["lz4", "zstd", "snappy"];
613
614 for codec in invalid_codecs {
615 let props = HashMap::from([(
616 TableProperties::PROPERTY_METADATA_COMPRESSION_CODEC.to_string(),
617 codec.to_string(),
618 )]);
619 let err = TableProperties::try_from(&props).unwrap_err();
620 let err_msg = err.to_string();
621 assert!(
622 err_msg.contains(&format!("Invalid metadata compression codec: {codec}")),
623 "Expected error message to contain codec '{codec}', got: {err_msg}"
624 );
625 assert!(
626 err_msg.contains("Only 'none' and 'gzip' are supported"),
627 "Expected error message to contain supported codecs, got: {err_msg}"
628 );
629 }
630 }
631
632 #[test]
633 fn test_parse_metadata_file_compression_valid() {
634 let props = HashMap::from([(
636 TableProperties::PROPERTY_METADATA_COMPRESSION_CODEC.to_string(),
637 "none".to_string(),
638 )]);
639 assert_eq!(
640 parse_metadata_file_compression(&props).unwrap(),
641 CompressionCodec::None
642 );
643
644 let props = HashMap::from([(
646 TableProperties::PROPERTY_METADATA_COMPRESSION_CODEC.to_string(),
647 "".to_string(),
648 )]);
649 assert_eq!(
650 parse_metadata_file_compression(&props).unwrap(),
651 CompressionCodec::None
652 );
653
654 let props = HashMap::from([(
656 TableProperties::PROPERTY_METADATA_COMPRESSION_CODEC.to_string(),
657 "gzip".to_string(),
658 )]);
659 assert_eq!(
660 parse_metadata_file_compression(&props).unwrap(),
661 CompressionCodec::gzip_default()
662 );
663
664 let props = HashMap::from([(
666 TableProperties::PROPERTY_METADATA_COMPRESSION_CODEC.to_string(),
667 "NONE".to_string(),
668 )]);
669 assert_eq!(
670 parse_metadata_file_compression(&props).unwrap(),
671 CompressionCodec::None
672 );
673
674 let props = HashMap::from([(
676 TableProperties::PROPERTY_METADATA_COMPRESSION_CODEC.to_string(),
677 "GZIP".to_string(),
678 )]);
679 assert_eq!(
680 parse_metadata_file_compression(&props).unwrap(),
681 CompressionCodec::gzip_default()
682 );
683
684 let props = HashMap::from([(
686 TableProperties::PROPERTY_METADATA_COMPRESSION_CODEC.to_string(),
687 "GzIp".to_string(),
688 )]);
689 assert_eq!(
690 parse_metadata_file_compression(&props).unwrap(),
691 CompressionCodec::gzip_default()
692 );
693
694 let props = HashMap::new();
696 assert_eq!(
697 parse_metadata_file_compression(&props).unwrap(),
698 CompressionCodec::None
699 );
700 }
701
702 #[test]
703 fn test_parse_metadata_file_compression_invalid() {
704 let invalid_codecs = ["lz4", "zstd", "snappy"];
705
706 for codec in invalid_codecs {
707 let props = HashMap::from([(
708 TableProperties::PROPERTY_METADATA_COMPRESSION_CODEC.to_string(),
709 codec.to_string(),
710 )]);
711 let err = parse_metadata_file_compression(&props).unwrap_err();
712 let err_msg = err.to_string();
713 assert!(
714 err_msg.contains("Invalid metadata compression codec"),
715 "Expected error message to contain 'Invalid metadata compression codec', got: {err_msg}"
716 );
717 assert!(
718 err_msg.contains("Only 'none' and 'gzip' are supported"),
719 "Expected error message to contain supported codecs, got: {err_msg}"
720 );
721 }
722 }
723
724 #[test]
725 fn test_cdc_disabled_by_default() {
726 let props = HashMap::new();
727 let tp = TableProperties::try_from(&props).unwrap();
728 assert!(!tp.cdc_enabled);
729 }
730
731 #[test]
732 fn test_cdc_enabled_via_flag() {
733 let props = HashMap::from([(
734 TableProperties::PROPERTY_PARQUET_CDC_ENABLED.to_string(),
735 "true".to_string(),
736 )]);
737 let tp = TableProperties::try_from(&props).unwrap();
738 assert!(tp.cdc_enabled);
739 assert_eq!(tp.cdc_min_chunk_size, 256 * 1024);
740 assert_eq!(tp.cdc_max_chunk_size, 1024 * 1024);
741 assert_eq!(tp.cdc_norm_level, 0);
742 }
743
744 #[test]
745 fn test_cdc_size_props_alone_do_not_enable() {
746 let props = HashMap::from([(
747 TableProperties::PROPERTY_PARQUET_CDC_MIN_CHUNK_SIZE.to_string(),
748 "262144".to_string(),
749 )]);
750 let tp = TableProperties::try_from(&props).unwrap();
751 assert!(!tp.cdc_enabled);
752 }
753
754 #[test]
755 fn test_cdc_custom_values() {
756 let props = HashMap::from([
757 (
758 TableProperties::PROPERTY_PARQUET_CDC_ENABLED.to_string(),
759 "true".to_string(),
760 ),
761 (
762 TableProperties::PROPERTY_PARQUET_CDC_MIN_CHUNK_SIZE.to_string(),
763 "200000".to_string(),
764 ),
765 (
766 TableProperties::PROPERTY_PARQUET_CDC_MAX_CHUNK_SIZE.to_string(),
767 "900000".to_string(),
768 ),
769 (
770 TableProperties::PROPERTY_PARQUET_CDC_NORM_LEVEL.to_string(),
771 "1".to_string(),
772 ),
773 ]);
774 let tp = TableProperties::try_from(&props).unwrap();
775 assert!(tp.cdc_enabled);
776 assert_eq!(tp.cdc_min_chunk_size, 200000);
777 assert_eq!(tp.cdc_max_chunk_size, 900000);
778 assert_eq!(tp.cdc_norm_level, 1);
779 }
780
781 #[test]
782 fn test_cdc_partial_override() {
783 let props = HashMap::from([
784 (
785 TableProperties::PROPERTY_PARQUET_CDC_ENABLED.to_string(),
786 "true".to_string(),
787 ),
788 (
789 TableProperties::PROPERTY_PARQUET_CDC_NORM_LEVEL.to_string(),
790 "2".to_string(),
791 ),
792 ]);
793 let tp = TableProperties::try_from(&props).unwrap();
794 assert!(tp.cdc_enabled);
795 assert_eq!(tp.cdc_min_chunk_size, 256 * 1024);
796 assert_eq!(tp.cdc_max_chunk_size, 1024 * 1024);
797 assert_eq!(tp.cdc_norm_level, 2);
798 }
799
800 #[test]
801 fn test_cdc_negative_norm_level() {
802 let props = HashMap::from([
803 (
804 TableProperties::PROPERTY_PARQUET_CDC_ENABLED.to_string(),
805 "true".to_string(),
806 ),
807 (
808 TableProperties::PROPERTY_PARQUET_CDC_NORM_LEVEL.to_string(),
809 "-2".to_string(),
810 ),
811 ]);
812 let tp = TableProperties::try_from(&props).unwrap();
813 assert_eq!(tp.cdc_norm_level, -2);
814 }
815
816 #[test]
817 fn test_cdc_invalid_min_chunk_size() {
818 let props = HashMap::from([
819 (
820 TableProperties::PROPERTY_PARQUET_CDC_ENABLED.to_string(),
821 "true".to_string(),
822 ),
823 (
824 TableProperties::PROPERTY_PARQUET_CDC_MIN_CHUNK_SIZE.to_string(),
825 "not_a_number".to_string(),
826 ),
827 ]);
828 let err = TableProperties::try_from(&props).unwrap_err();
829 assert!(
830 err.to_string().contains(
831 "Invalid value for write.parquet.content-defined-chunking.min-chunk-size"
832 )
833 );
834 }
835
836 #[test]
837 fn test_cdc_invalid_norm_level() {
838 let props = HashMap::from([
839 (
840 TableProperties::PROPERTY_PARQUET_CDC_ENABLED.to_string(),
841 "true".to_string(),
842 ),
843 (
844 TableProperties::PROPERTY_PARQUET_CDC_NORM_LEVEL.to_string(),
845 "not_a_number".to_string(),
846 ),
847 ]);
848 let err = TableProperties::try_from(&props).unwrap_err();
849 assert!(
850 err.to_string()
851 .contains("Invalid value for write.parquet.content-defined-chunking.norm-level")
852 );
853 }
854
855 #[test]
856 fn test_cdc_no_properties() {
857 let props = HashMap::from([("some.other.property".to_string(), "value".to_string())]);
858 let tp = TableProperties::try_from(&props).unwrap();
859 assert!(!tp.cdc_enabled);
860 }
861}