1use std::collections::HashMap;
19use std::fmt::Display;
20use std::str::FromStr;
21
22use crate::compression::CompressionCodec;
23use crate::error::{Error, ErrorKind, Result};
24
25fn parse_property<T: FromStr>(
26 properties: &HashMap<String, String>,
27 key: &str,
28 default: T,
29) -> Result<T>
30where
31 <T as FromStr>::Err: Display,
32{
33 properties.get(key).map_or(Ok(default), |value| {
34 value.parse::<T>().map_err(|e| {
35 Error::new(
36 ErrorKind::DataInvalid,
37 format!("Invalid value for {key}: {e}"),
38 )
39 })
40 })
41}
42
43pub(crate) fn parse_metadata_file_compression(
56 properties: &HashMap<String, String>,
57) -> Result<CompressionCodec> {
58 let value = properties
59 .get(TableProperties::PROPERTY_METADATA_COMPRESSION_CODEC)
60 .map(|s| s.as_str())
61 .unwrap_or(TableProperties::PROPERTY_METADATA_COMPRESSION_CODEC_DEFAULT);
62
63 if value.is_empty() {
65 return Ok(CompressionCodec::None);
66 }
67
68 let lowercase_value = value.to_lowercase();
70
71 let codec: CompressionCodec = serde_json::from_value(serde_json::Value::String(
73 lowercase_value,
74 ))
75 .map_err(|_| {
76 Error::new(
77 ErrorKind::DataInvalid,
78 format!(
79 "Invalid metadata compression codec: {value}. Only '{}' and '{}' are supported.",
80 CompressionCodec::None.name(),
81 CompressionCodec::gzip_default().name()
82 ),
83 )
84 })?;
85
86 match codec {
88 CompressionCodec::None | CompressionCodec::Gzip(_) => Ok(codec),
89 _ => Err(Error::new(
90 ErrorKind::DataInvalid,
91 format!(
92 "Invalid metadata compression codec: {value}. Only '{}' and '{}' are supported for metadata files.",
93 CompressionCodec::None.name(),
94 CompressionCodec::gzip_default().name()
95 ),
96 )),
97 }
98}
99
100#[derive(Debug)]
102pub struct TableProperties {
103 pub commit_num_retries: usize,
105 pub commit_min_retry_wait_ms: u64,
107 pub commit_max_retry_wait_ms: u64,
109 pub commit_total_retry_timeout_ms: u64,
111 pub write_format_default: String,
113 pub write_target_file_size_bytes: usize,
115 pub metadata_compression_codec: CompressionCodec,
117 pub write_datafusion_fanout_enabled: bool,
119 pub gc_enabled: bool,
122 pub cdc_enabled: bool,
125 pub cdc_min_chunk_size: usize,
127 pub cdc_max_chunk_size: usize,
129 pub cdc_norm_level: i32,
131}
132
133impl TableProperties {
134 pub const PROPERTY_FORMAT_VERSION: &str = "format-version";
144 pub const PROPERTY_UUID: &str = "uuid";
146 pub const PROPERTY_SNAPSHOT_COUNT: &str = "snapshot-count";
148 pub const PROPERTY_CURRENT_SNAPSHOT_SUMMARY: &str = "current-snapshot-summary";
150 pub const PROPERTY_CURRENT_SNAPSHOT_ID: &str = "current-snapshot-id";
152 pub const PROPERTY_CURRENT_SNAPSHOT_TIMESTAMP: &str = "current-snapshot-timestamp-ms";
154 pub const PROPERTY_CURRENT_SCHEMA: &str = "current-schema";
156 pub const PROPERTY_DEFAULT_PARTITION_SPEC: &str = "default-partition-spec";
158 pub const PROPERTY_DEFAULT_SORT_ORDER: &str = "default-sort-order";
160
161 pub const PROPERTY_METADATA_PREVIOUS_VERSIONS_MAX: &str =
163 "write.metadata.previous-versions-max";
164 pub const PROPERTY_METADATA_PREVIOUS_VERSIONS_MAX_DEFAULT: usize = 100;
166
167 pub const PROPERTY_WRITE_PARTITION_SUMMARY_LIMIT: &str = "write.summary.partition-limit";
169 pub const PROPERTY_WRITE_PARTITION_SUMMARY_LIMIT_DEFAULT: u64 = 0;
171
172 pub const RESERVED_PROPERTIES: [&str; 9] = [
177 Self::PROPERTY_FORMAT_VERSION,
178 Self::PROPERTY_UUID,
179 Self::PROPERTY_SNAPSHOT_COUNT,
180 Self::PROPERTY_CURRENT_SNAPSHOT_ID,
181 Self::PROPERTY_CURRENT_SNAPSHOT_SUMMARY,
182 Self::PROPERTY_CURRENT_SNAPSHOT_TIMESTAMP,
183 Self::PROPERTY_CURRENT_SCHEMA,
184 Self::PROPERTY_DEFAULT_PARTITION_SPEC,
185 Self::PROPERTY_DEFAULT_SORT_ORDER,
186 ];
187
188 pub const PROPERTY_COMMIT_NUM_RETRIES: &str = "commit.retry.num-retries";
190 pub const PROPERTY_COMMIT_NUM_RETRIES_DEFAULT: usize = 4;
192
193 pub const PROPERTY_COMMIT_MIN_RETRY_WAIT_MS: &str = "commit.retry.min-wait-ms";
195 pub const PROPERTY_COMMIT_MIN_RETRY_WAIT_MS_DEFAULT: u64 = 100;
197
198 pub const PROPERTY_COMMIT_MAX_RETRY_WAIT_MS: &str = "commit.retry.max-wait-ms";
200 pub const PROPERTY_COMMIT_MAX_RETRY_WAIT_MS_DEFAULT: u64 = 60 * 1000; pub const PROPERTY_COMMIT_TOTAL_RETRY_TIME_MS: &str = "commit.retry.total-timeout-ms";
205 pub const PROPERTY_COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT: u64 = 30 * 60 * 1000; pub const PROPERTY_DEFAULT_FILE_FORMAT: &str = "write.format.default";
210 pub const PROPERTY_DELETE_DEFAULT_FILE_FORMAT: &str = "write.delete.format.default";
212 pub const PROPERTY_DEFAULT_FILE_FORMAT_DEFAULT: &str = "parquet";
214
215 pub const PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES: &str = "write.target-file-size-bytes";
217 pub const PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT: usize = 512 * 1024 * 1024; pub const PROPERTY_METADATA_COMPRESSION_CODEC: &str = "write.metadata.compression-codec";
222 pub const PROPERTY_METADATA_COMPRESSION_CODEC_DEFAULT: &str = "none";
224 pub const PROPERTY_DATAFUSION_WRITE_FANOUT_ENABLED: &str = "write.datafusion.fanout.enabled";
227 pub const PROPERTY_DATAFUSION_WRITE_FANOUT_ENABLED_DEFAULT: bool = true;
229
230 pub const PROPERTY_GC_ENABLED: &str = "gc.enabled";
234 pub const PROPERTY_GC_ENABLED_DEFAULT: bool = true;
236
237 pub const PROPERTY_PARQUET_CDC_ENABLED: &str = "write.parquet.content-defined-chunking.enabled";
239 pub const PROPERTY_PARQUET_CDC_ENABLED_DEFAULT: bool = false;
241 pub const PROPERTY_PARQUET_CDC_MIN_CHUNK_SIZE: &str =
243 "write.parquet.content-defined-chunking.min-chunk-size";
244 pub const PROPERTY_PARQUET_CDC_MIN_CHUNK_SIZE_DEFAULT: usize = 256 * 1024;
246 pub const PROPERTY_PARQUET_CDC_MAX_CHUNK_SIZE: &str =
248 "write.parquet.content-defined-chunking.max-chunk-size";
249 pub const PROPERTY_PARQUET_CDC_MAX_CHUNK_SIZE_DEFAULT: usize = 1024 * 1024;
251 pub const PROPERTY_PARQUET_CDC_NORM_LEVEL: &str =
253 "write.parquet.content-defined-chunking.norm-level";
254 pub const PROPERTY_PARQUET_CDC_NORM_LEVEL_DEFAULT: i32 = 0;
256}
257
258impl TryFrom<&HashMap<String, String>> for TableProperties {
259 type Error = Error;
261
262 fn try_from(props: &HashMap<String, String>) -> Result<Self> {
263 Ok(TableProperties {
264 commit_num_retries: parse_property(
265 props,
266 TableProperties::PROPERTY_COMMIT_NUM_RETRIES,
267 TableProperties::PROPERTY_COMMIT_NUM_RETRIES_DEFAULT,
268 )?,
269 commit_min_retry_wait_ms: parse_property(
270 props,
271 TableProperties::PROPERTY_COMMIT_MIN_RETRY_WAIT_MS,
272 TableProperties::PROPERTY_COMMIT_MIN_RETRY_WAIT_MS_DEFAULT,
273 )?,
274 commit_max_retry_wait_ms: parse_property(
275 props,
276 TableProperties::PROPERTY_COMMIT_MAX_RETRY_WAIT_MS,
277 TableProperties::PROPERTY_COMMIT_MAX_RETRY_WAIT_MS_DEFAULT,
278 )?,
279 commit_total_retry_timeout_ms: parse_property(
280 props,
281 TableProperties::PROPERTY_COMMIT_TOTAL_RETRY_TIME_MS,
282 TableProperties::PROPERTY_COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT,
283 )?,
284 write_format_default: parse_property(
285 props,
286 TableProperties::PROPERTY_DEFAULT_FILE_FORMAT,
287 TableProperties::PROPERTY_DEFAULT_FILE_FORMAT_DEFAULT.to_string(),
288 )?,
289 write_target_file_size_bytes: parse_property(
290 props,
291 TableProperties::PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES,
292 TableProperties::PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT,
293 )?,
294 metadata_compression_codec: parse_metadata_file_compression(props)?,
295 write_datafusion_fanout_enabled: parse_property(
296 props,
297 TableProperties::PROPERTY_DATAFUSION_WRITE_FANOUT_ENABLED,
298 TableProperties::PROPERTY_DATAFUSION_WRITE_FANOUT_ENABLED_DEFAULT,
299 )?,
300 gc_enabled: parse_property(
301 props,
302 TableProperties::PROPERTY_GC_ENABLED,
303 TableProperties::PROPERTY_GC_ENABLED_DEFAULT,
304 )?,
305 cdc_enabled: parse_property(
306 props,
307 TableProperties::PROPERTY_PARQUET_CDC_ENABLED,
308 TableProperties::PROPERTY_PARQUET_CDC_ENABLED_DEFAULT,
309 )?,
310 cdc_min_chunk_size: parse_property(
311 props,
312 TableProperties::PROPERTY_PARQUET_CDC_MIN_CHUNK_SIZE,
313 TableProperties::PROPERTY_PARQUET_CDC_MIN_CHUNK_SIZE_DEFAULT,
314 )?,
315 cdc_max_chunk_size: parse_property(
316 props,
317 TableProperties::PROPERTY_PARQUET_CDC_MAX_CHUNK_SIZE,
318 TableProperties::PROPERTY_PARQUET_CDC_MAX_CHUNK_SIZE_DEFAULT,
319 )?,
320 cdc_norm_level: parse_property(
321 props,
322 TableProperties::PROPERTY_PARQUET_CDC_NORM_LEVEL,
323 TableProperties::PROPERTY_PARQUET_CDC_NORM_LEVEL_DEFAULT,
324 )?,
325 })
326 }
327}
328
329#[cfg(test)]
330mod tests {
331 use super::*;
332 use crate::compression::CompressionCodec;
333
334 #[test]
335 fn test_table_properties_default() {
336 let props = HashMap::new();
337 let table_properties = TableProperties::try_from(&props).unwrap();
338 assert_eq!(
339 table_properties.commit_num_retries,
340 TableProperties::PROPERTY_COMMIT_NUM_RETRIES_DEFAULT
341 );
342 assert_eq!(
343 table_properties.commit_min_retry_wait_ms,
344 TableProperties::PROPERTY_COMMIT_MIN_RETRY_WAIT_MS_DEFAULT
345 );
346 assert_eq!(
347 table_properties.commit_max_retry_wait_ms,
348 TableProperties::PROPERTY_COMMIT_MAX_RETRY_WAIT_MS_DEFAULT
349 );
350 assert_eq!(
351 table_properties.write_format_default,
352 TableProperties::PROPERTY_DEFAULT_FILE_FORMAT_DEFAULT.to_string()
353 );
354 assert_eq!(
355 table_properties.write_target_file_size_bytes,
356 TableProperties::PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT
357 );
358 assert_eq!(
360 table_properties.metadata_compression_codec,
361 CompressionCodec::None
362 );
363 assert_eq!(
364 table_properties.gc_enabled,
365 TableProperties::PROPERTY_GC_ENABLED_DEFAULT
366 );
367 }
368
369 #[test]
370 fn test_table_properties_compression() {
371 let props = HashMap::from([(
372 TableProperties::PROPERTY_METADATA_COMPRESSION_CODEC.to_string(),
373 "gzip".to_string(),
374 )]);
375 let table_properties = TableProperties::try_from(&props).unwrap();
376 assert_eq!(
377 table_properties.metadata_compression_codec,
378 CompressionCodec::gzip_default()
379 );
380 }
381
382 #[test]
383 fn test_table_properties_compression_none() {
384 let props = HashMap::from([(
385 TableProperties::PROPERTY_METADATA_COMPRESSION_CODEC.to_string(),
386 "none".to_string(),
387 )]);
388 let table_properties = TableProperties::try_from(&props).unwrap();
389 assert_eq!(
390 table_properties.metadata_compression_codec,
391 CompressionCodec::None
392 );
393 }
394
395 #[test]
396 fn test_table_properties_compression_case_insensitive() {
397 let props_upper = HashMap::from([(
399 TableProperties::PROPERTY_METADATA_COMPRESSION_CODEC.to_string(),
400 "GZIP".to_string(),
401 )]);
402 let table_properties = TableProperties::try_from(&props_upper).unwrap();
403 assert_eq!(
404 table_properties.metadata_compression_codec,
405 CompressionCodec::gzip_default()
406 );
407
408 let props_mixed = HashMap::from([(
410 TableProperties::PROPERTY_METADATA_COMPRESSION_CODEC.to_string(),
411 "GzIp".to_string(),
412 )]);
413 let table_properties = TableProperties::try_from(&props_mixed).unwrap();
414 assert_eq!(
415 table_properties.metadata_compression_codec,
416 CompressionCodec::gzip_default()
417 );
418
419 let props_none_upper = HashMap::from([(
421 TableProperties::PROPERTY_METADATA_COMPRESSION_CODEC.to_string(),
422 "NONE".to_string(),
423 )]);
424 let table_properties = TableProperties::try_from(&props_none_upper).unwrap();
425 assert_eq!(
426 table_properties.metadata_compression_codec,
427 CompressionCodec::None
428 );
429 }
430
431 #[test]
432 fn test_table_properties_valid() {
433 let props = HashMap::from([
434 (
435 TableProperties::PROPERTY_COMMIT_NUM_RETRIES.to_string(),
436 "10".to_string(),
437 ),
438 (
439 TableProperties::PROPERTY_COMMIT_MAX_RETRY_WAIT_MS.to_string(),
440 "20".to_string(),
441 ),
442 (
443 TableProperties::PROPERTY_DEFAULT_FILE_FORMAT.to_string(),
444 "avro".to_string(),
445 ),
446 (
447 TableProperties::PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES.to_string(),
448 "512".to_string(),
449 ),
450 (
451 TableProperties::PROPERTY_GC_ENABLED.to_string(),
452 "false".to_string(),
453 ),
454 ]);
455 let table_properties = TableProperties::try_from(&props).unwrap();
456 assert_eq!(table_properties.commit_num_retries, 10);
457 assert_eq!(table_properties.commit_max_retry_wait_ms, 20);
458 assert_eq!(table_properties.write_format_default, "avro".to_string());
459 assert_eq!(table_properties.write_target_file_size_bytes, 512);
460 assert!(!table_properties.gc_enabled);
461 }
462
463 #[test]
464 fn test_table_properties_invalid() {
465 let invalid_retries = HashMap::from([(
466 TableProperties::PROPERTY_COMMIT_NUM_RETRIES.to_string(),
467 "abc".to_string(),
468 )]);
469
470 let table_properties = TableProperties::try_from(&invalid_retries).unwrap_err();
471 assert!(
472 table_properties.to_string().contains(
473 "Invalid value for commit.retry.num-retries: invalid digit found in string"
474 )
475 );
476
477 let invalid_min_wait = HashMap::from([(
478 TableProperties::PROPERTY_COMMIT_MIN_RETRY_WAIT_MS.to_string(),
479 "abc".to_string(),
480 )]);
481 let table_properties = TableProperties::try_from(&invalid_min_wait).unwrap_err();
482 assert!(
483 table_properties.to_string().contains(
484 "Invalid value for commit.retry.min-wait-ms: invalid digit found in string"
485 )
486 );
487
488 let invalid_max_wait = HashMap::from([(
489 TableProperties::PROPERTY_COMMIT_MAX_RETRY_WAIT_MS.to_string(),
490 "abc".to_string(),
491 )]);
492 let table_properties = TableProperties::try_from(&invalid_max_wait).unwrap_err();
493 assert!(
494 table_properties.to_string().contains(
495 "Invalid value for commit.retry.max-wait-ms: invalid digit found in string"
496 )
497 );
498
499 let invalid_target_size = HashMap::from([(
500 TableProperties::PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES.to_string(),
501 "abc".to_string(),
502 )]);
503 let table_properties = TableProperties::try_from(&invalid_target_size).unwrap_err();
504 assert!(table_properties.to_string().contains(
505 "Invalid value for write.target-file-size-bytes: invalid digit found in string"
506 ));
507
508 let invalid_gc_enabled = HashMap::from([(
509 TableProperties::PROPERTY_GC_ENABLED.to_string(),
510 "notabool".to_string(),
511 )]);
512 let table_properties = TableProperties::try_from(&invalid_gc_enabled).unwrap_err();
513 assert!(
514 table_properties
515 .to_string()
516 .contains("Invalid value for gc.enabled")
517 );
518 }
519
520 #[test]
521 fn test_table_properties_compression_invalid_rejected() {
522 let invalid_codecs = ["lz4", "zstd", "snappy"];
523
524 for codec in invalid_codecs {
525 let props = HashMap::from([(
526 TableProperties::PROPERTY_METADATA_COMPRESSION_CODEC.to_string(),
527 codec.to_string(),
528 )]);
529 let err = TableProperties::try_from(&props).unwrap_err();
530 let err_msg = err.to_string();
531 assert!(
532 err_msg.contains(&format!("Invalid metadata compression codec: {codec}")),
533 "Expected error message to contain codec '{codec}', got: {err_msg}"
534 );
535 assert!(
536 err_msg.contains("Only 'none' and 'gzip' are supported"),
537 "Expected error message to contain supported codecs, got: {err_msg}"
538 );
539 }
540 }
541
542 #[test]
543 fn test_parse_metadata_file_compression_valid() {
544 let props = HashMap::from([(
546 TableProperties::PROPERTY_METADATA_COMPRESSION_CODEC.to_string(),
547 "none".to_string(),
548 )]);
549 assert_eq!(
550 parse_metadata_file_compression(&props).unwrap(),
551 CompressionCodec::None
552 );
553
554 let props = HashMap::from([(
556 TableProperties::PROPERTY_METADATA_COMPRESSION_CODEC.to_string(),
557 "".to_string(),
558 )]);
559 assert_eq!(
560 parse_metadata_file_compression(&props).unwrap(),
561 CompressionCodec::None
562 );
563
564 let props = HashMap::from([(
566 TableProperties::PROPERTY_METADATA_COMPRESSION_CODEC.to_string(),
567 "gzip".to_string(),
568 )]);
569 assert_eq!(
570 parse_metadata_file_compression(&props).unwrap(),
571 CompressionCodec::gzip_default()
572 );
573
574 let props = HashMap::from([(
576 TableProperties::PROPERTY_METADATA_COMPRESSION_CODEC.to_string(),
577 "NONE".to_string(),
578 )]);
579 assert_eq!(
580 parse_metadata_file_compression(&props).unwrap(),
581 CompressionCodec::None
582 );
583
584 let props = HashMap::from([(
586 TableProperties::PROPERTY_METADATA_COMPRESSION_CODEC.to_string(),
587 "GZIP".to_string(),
588 )]);
589 assert_eq!(
590 parse_metadata_file_compression(&props).unwrap(),
591 CompressionCodec::gzip_default()
592 );
593
594 let props = HashMap::from([(
596 TableProperties::PROPERTY_METADATA_COMPRESSION_CODEC.to_string(),
597 "GzIp".to_string(),
598 )]);
599 assert_eq!(
600 parse_metadata_file_compression(&props).unwrap(),
601 CompressionCodec::gzip_default()
602 );
603
604 let props = HashMap::new();
606 assert_eq!(
607 parse_metadata_file_compression(&props).unwrap(),
608 CompressionCodec::None
609 );
610 }
611
612 #[test]
613 fn test_parse_metadata_file_compression_invalid() {
614 let invalid_codecs = ["lz4", "zstd", "snappy"];
615
616 for codec in invalid_codecs {
617 let props = HashMap::from([(
618 TableProperties::PROPERTY_METADATA_COMPRESSION_CODEC.to_string(),
619 codec.to_string(),
620 )]);
621 let err = parse_metadata_file_compression(&props).unwrap_err();
622 let err_msg = err.to_string();
623 assert!(
624 err_msg.contains("Invalid metadata compression codec"),
625 "Expected error message to contain 'Invalid metadata compression codec', got: {err_msg}"
626 );
627 assert!(
628 err_msg.contains("Only 'none' and 'gzip' are supported"),
629 "Expected error message to contain supported codecs, got: {err_msg}"
630 );
631 }
632 }
633
634 #[test]
635 fn test_cdc_disabled_by_default() {
636 let props = HashMap::new();
637 let tp = TableProperties::try_from(&props).unwrap();
638 assert!(!tp.cdc_enabled);
639 }
640
641 #[test]
642 fn test_cdc_enabled_via_flag() {
643 let props = HashMap::from([(
644 TableProperties::PROPERTY_PARQUET_CDC_ENABLED.to_string(),
645 "true".to_string(),
646 )]);
647 let tp = TableProperties::try_from(&props).unwrap();
648 assert!(tp.cdc_enabled);
649 assert_eq!(tp.cdc_min_chunk_size, 256 * 1024);
650 assert_eq!(tp.cdc_max_chunk_size, 1024 * 1024);
651 assert_eq!(tp.cdc_norm_level, 0);
652 }
653
654 #[test]
655 fn test_cdc_size_props_alone_do_not_enable() {
656 let props = HashMap::from([(
657 TableProperties::PROPERTY_PARQUET_CDC_MIN_CHUNK_SIZE.to_string(),
658 "262144".to_string(),
659 )]);
660 let tp = TableProperties::try_from(&props).unwrap();
661 assert!(!tp.cdc_enabled);
662 }
663
664 #[test]
665 fn test_cdc_custom_values() {
666 let props = HashMap::from([
667 (
668 TableProperties::PROPERTY_PARQUET_CDC_ENABLED.to_string(),
669 "true".to_string(),
670 ),
671 (
672 TableProperties::PROPERTY_PARQUET_CDC_MIN_CHUNK_SIZE.to_string(),
673 "200000".to_string(),
674 ),
675 (
676 TableProperties::PROPERTY_PARQUET_CDC_MAX_CHUNK_SIZE.to_string(),
677 "900000".to_string(),
678 ),
679 (
680 TableProperties::PROPERTY_PARQUET_CDC_NORM_LEVEL.to_string(),
681 "1".to_string(),
682 ),
683 ]);
684 let tp = TableProperties::try_from(&props).unwrap();
685 assert!(tp.cdc_enabled);
686 assert_eq!(tp.cdc_min_chunk_size, 200000);
687 assert_eq!(tp.cdc_max_chunk_size, 900000);
688 assert_eq!(tp.cdc_norm_level, 1);
689 }
690
691 #[test]
692 fn test_cdc_partial_override() {
693 let props = HashMap::from([
694 (
695 TableProperties::PROPERTY_PARQUET_CDC_ENABLED.to_string(),
696 "true".to_string(),
697 ),
698 (
699 TableProperties::PROPERTY_PARQUET_CDC_NORM_LEVEL.to_string(),
700 "2".to_string(),
701 ),
702 ]);
703 let tp = TableProperties::try_from(&props).unwrap();
704 assert!(tp.cdc_enabled);
705 assert_eq!(tp.cdc_min_chunk_size, 256 * 1024);
706 assert_eq!(tp.cdc_max_chunk_size, 1024 * 1024);
707 assert_eq!(tp.cdc_norm_level, 2);
708 }
709
710 #[test]
711 fn test_cdc_negative_norm_level() {
712 let props = HashMap::from([
713 (
714 TableProperties::PROPERTY_PARQUET_CDC_ENABLED.to_string(),
715 "true".to_string(),
716 ),
717 (
718 TableProperties::PROPERTY_PARQUET_CDC_NORM_LEVEL.to_string(),
719 "-2".to_string(),
720 ),
721 ]);
722 let tp = TableProperties::try_from(&props).unwrap();
723 assert_eq!(tp.cdc_norm_level, -2);
724 }
725
726 #[test]
727 fn test_cdc_invalid_min_chunk_size() {
728 let props = HashMap::from([
729 (
730 TableProperties::PROPERTY_PARQUET_CDC_ENABLED.to_string(),
731 "true".to_string(),
732 ),
733 (
734 TableProperties::PROPERTY_PARQUET_CDC_MIN_CHUNK_SIZE.to_string(),
735 "not_a_number".to_string(),
736 ),
737 ]);
738 let err = TableProperties::try_from(&props).unwrap_err();
739 assert!(
740 err.to_string().contains(
741 "Invalid value for write.parquet.content-defined-chunking.min-chunk-size"
742 )
743 );
744 }
745
746 #[test]
747 fn test_cdc_invalid_norm_level() {
748 let props = HashMap::from([
749 (
750 TableProperties::PROPERTY_PARQUET_CDC_ENABLED.to_string(),
751 "true".to_string(),
752 ),
753 (
754 TableProperties::PROPERTY_PARQUET_CDC_NORM_LEVEL.to_string(),
755 "not_a_number".to_string(),
756 ),
757 ]);
758 let err = TableProperties::try_from(&props).unwrap_err();
759 assert!(
760 err.to_string()
761 .contains("Invalid value for write.parquet.content-defined-chunking.norm-level")
762 );
763 }
764
765 #[test]
766 fn test_cdc_no_properties() {
767 let props = HashMap::from([("some.other.property".to_string(), "value".to_string())]);
768 let tp = TableProperties::try_from(&props).unwrap();
769 assert!(!tp.cdc_enabled);
770 }
771}