1use std::collections::HashMap;
19use std::fmt::Display;
20use std::str::FromStr;
21
22use crate::compression::CompressionCodec;
23use crate::error::{Error, ErrorKind, Result};
24
25fn parse_property<T: FromStr>(
28 properties: &HashMap<String, String>,
29 key: &str,
30 default: T,
31) -> Result<T>
32where
33 <T as FromStr>::Err: Display,
34{
35 properties.get(key).map_or(Ok(default), |value| {
36 value.parse::<T>().map_err(|e| {
37 Error::new(
38 ErrorKind::DataInvalid,
39 format!("Invalid value for {key}: {e}"),
40 )
41 })
42 })
43}
44
45pub(crate) fn parse_metadata_file_compression(
58 properties: &HashMap<String, String>,
59) -> Result<CompressionCodec> {
60 let value = properties
61 .get(TableProperties::PROPERTY_METADATA_COMPRESSION_CODEC)
62 .map(|s| s.as_str())
63 .unwrap_or(TableProperties::PROPERTY_METADATA_COMPRESSION_CODEC_DEFAULT);
64
65 if value.is_empty() {
67 return Ok(CompressionCodec::None);
68 }
69
70 let lowercase_value = value.to_lowercase();
72
73 let codec: CompressionCodec = serde_json::from_value(serde_json::Value::String(
75 lowercase_value,
76 ))
77 .map_err(|_| {
78 Error::new(
79 ErrorKind::DataInvalid,
80 format!(
81 "Invalid metadata compression codec: {value}. Only 'none' and 'gzip' are supported."
82 ),
83 )
84 })?;
85
86 match codec {
88 CompressionCodec::None | CompressionCodec::Gzip => Ok(codec),
89 CompressionCodec::Lz4 | CompressionCodec::Zstd => Err(Error::new(
90 ErrorKind::DataInvalid,
91 format!(
92 "Invalid metadata compression codec: {value}. Only 'none' and 'gzip' are supported for metadata files."
93 ),
94 )),
95 }
96}
97
98#[derive(Debug)]
100pub struct TableProperties {
101 pub commit_num_retries: usize,
103 pub commit_min_retry_wait_ms: u64,
105 pub commit_max_retry_wait_ms: u64,
107 pub commit_total_retry_timeout_ms: u64,
109 pub write_format_default: String,
111 pub write_target_file_size_bytes: usize,
113 pub metadata_compression_codec: CompressionCodec,
115 pub write_datafusion_fanout_enabled: bool,
117}
118
119impl TableProperties {
120 pub const PROPERTY_FORMAT_VERSION: &str = "format-version";
130 pub const PROPERTY_UUID: &str = "uuid";
132 pub const PROPERTY_SNAPSHOT_COUNT: &str = "snapshot-count";
134 pub const PROPERTY_CURRENT_SNAPSHOT_SUMMARY: &str = "current-snapshot-summary";
136 pub const PROPERTY_CURRENT_SNAPSHOT_ID: &str = "current-snapshot-id";
138 pub const PROPERTY_CURRENT_SNAPSHOT_TIMESTAMP: &str = "current-snapshot-timestamp-ms";
140 pub const PROPERTY_CURRENT_SCHEMA: &str = "current-schema";
142 pub const PROPERTY_DEFAULT_PARTITION_SPEC: &str = "default-partition-spec";
144 pub const PROPERTY_DEFAULT_SORT_ORDER: &str = "default-sort-order";
146
147 pub const PROPERTY_METADATA_PREVIOUS_VERSIONS_MAX: &str =
149 "write.metadata.previous-versions-max";
150 pub const PROPERTY_METADATA_PREVIOUS_VERSIONS_MAX_DEFAULT: usize = 100;
152
153 pub const PROPERTY_WRITE_PARTITION_SUMMARY_LIMIT: &str = "write.summary.partition-limit";
155 pub const PROPERTY_WRITE_PARTITION_SUMMARY_LIMIT_DEFAULT: u64 = 0;
157
158 pub const RESERVED_PROPERTIES: [&str; 9] = [
163 Self::PROPERTY_FORMAT_VERSION,
164 Self::PROPERTY_UUID,
165 Self::PROPERTY_SNAPSHOT_COUNT,
166 Self::PROPERTY_CURRENT_SNAPSHOT_ID,
167 Self::PROPERTY_CURRENT_SNAPSHOT_SUMMARY,
168 Self::PROPERTY_CURRENT_SNAPSHOT_TIMESTAMP,
169 Self::PROPERTY_CURRENT_SCHEMA,
170 Self::PROPERTY_DEFAULT_PARTITION_SPEC,
171 Self::PROPERTY_DEFAULT_SORT_ORDER,
172 ];
173
174 pub const PROPERTY_COMMIT_NUM_RETRIES: &str = "commit.retry.num-retries";
176 pub const PROPERTY_COMMIT_NUM_RETRIES_DEFAULT: usize = 4;
178
179 pub const PROPERTY_COMMIT_MIN_RETRY_WAIT_MS: &str = "commit.retry.min-wait-ms";
181 pub const PROPERTY_COMMIT_MIN_RETRY_WAIT_MS_DEFAULT: u64 = 100;
183
184 pub const PROPERTY_COMMIT_MAX_RETRY_WAIT_MS: &str = "commit.retry.max-wait-ms";
186 pub const PROPERTY_COMMIT_MAX_RETRY_WAIT_MS_DEFAULT: u64 = 60 * 1000; pub const PROPERTY_COMMIT_TOTAL_RETRY_TIME_MS: &str = "commit.retry.total-timeout-ms";
191 pub const PROPERTY_COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT: u64 = 30 * 60 * 1000; pub const PROPERTY_DEFAULT_FILE_FORMAT: &str = "write.format.default";
196 pub const PROPERTY_DELETE_DEFAULT_FILE_FORMAT: &str = "write.delete.format.default";
198 pub const PROPERTY_DEFAULT_FILE_FORMAT_DEFAULT: &str = "parquet";
200
201 pub const PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES: &str = "write.target-file-size-bytes";
203 pub const PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT: usize = 512 * 1024 * 1024; pub const PROPERTY_METADATA_COMPRESSION_CODEC: &str = "write.metadata.compression-codec";
208 pub const PROPERTY_METADATA_COMPRESSION_CODEC_DEFAULT: &str = "none";
210 pub const PROPERTY_DATAFUSION_WRITE_FANOUT_ENABLED: &str = "write.datafusion.fanout.enabled";
213 pub const PROPERTY_DATAFUSION_WRITE_FANOUT_ENABLED_DEFAULT: bool = true;
215}
216
217impl TryFrom<&HashMap<String, String>> for TableProperties {
218 type Error = Error;
220
221 fn try_from(props: &HashMap<String, String>) -> Result<Self> {
222 Ok(TableProperties {
223 commit_num_retries: parse_property(
224 props,
225 TableProperties::PROPERTY_COMMIT_NUM_RETRIES,
226 TableProperties::PROPERTY_COMMIT_NUM_RETRIES_DEFAULT,
227 )?,
228 commit_min_retry_wait_ms: parse_property(
229 props,
230 TableProperties::PROPERTY_COMMIT_MIN_RETRY_WAIT_MS,
231 TableProperties::PROPERTY_COMMIT_MIN_RETRY_WAIT_MS_DEFAULT,
232 )?,
233 commit_max_retry_wait_ms: parse_property(
234 props,
235 TableProperties::PROPERTY_COMMIT_MAX_RETRY_WAIT_MS,
236 TableProperties::PROPERTY_COMMIT_MAX_RETRY_WAIT_MS_DEFAULT,
237 )?,
238 commit_total_retry_timeout_ms: parse_property(
239 props,
240 TableProperties::PROPERTY_COMMIT_TOTAL_RETRY_TIME_MS,
241 TableProperties::PROPERTY_COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT,
242 )?,
243 write_format_default: parse_property(
244 props,
245 TableProperties::PROPERTY_DEFAULT_FILE_FORMAT,
246 TableProperties::PROPERTY_DEFAULT_FILE_FORMAT_DEFAULT.to_string(),
247 )?,
248 write_target_file_size_bytes: parse_property(
249 props,
250 TableProperties::PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES,
251 TableProperties::PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT,
252 )?,
253 metadata_compression_codec: parse_metadata_file_compression(props)?,
254 write_datafusion_fanout_enabled: parse_property(
255 props,
256 TableProperties::PROPERTY_DATAFUSION_WRITE_FANOUT_ENABLED,
257 TableProperties::PROPERTY_DATAFUSION_WRITE_FANOUT_ENABLED_DEFAULT,
258 )?,
259 })
260 }
261}
262
263#[cfg(test)]
264mod tests {
265 use super::*;
266 use crate::compression::CompressionCodec;
267
268 #[test]
269 fn test_table_properties_default() {
270 let props = HashMap::new();
271 let table_properties = TableProperties::try_from(&props).unwrap();
272 assert_eq!(
273 table_properties.commit_num_retries,
274 TableProperties::PROPERTY_COMMIT_NUM_RETRIES_DEFAULT
275 );
276 assert_eq!(
277 table_properties.commit_min_retry_wait_ms,
278 TableProperties::PROPERTY_COMMIT_MIN_RETRY_WAIT_MS_DEFAULT
279 );
280 assert_eq!(
281 table_properties.commit_max_retry_wait_ms,
282 TableProperties::PROPERTY_COMMIT_MAX_RETRY_WAIT_MS_DEFAULT
283 );
284 assert_eq!(
285 table_properties.write_format_default,
286 TableProperties::PROPERTY_DEFAULT_FILE_FORMAT_DEFAULT.to_string()
287 );
288 assert_eq!(
289 table_properties.write_target_file_size_bytes,
290 TableProperties::PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT
291 );
292 assert_eq!(
294 table_properties.metadata_compression_codec,
295 CompressionCodec::None
296 );
297 }
298
299 #[test]
300 fn test_table_properties_compression() {
301 let props = HashMap::from([(
302 TableProperties::PROPERTY_METADATA_COMPRESSION_CODEC.to_string(),
303 "gzip".to_string(),
304 )]);
305 let table_properties = TableProperties::try_from(&props).unwrap();
306 assert_eq!(
307 table_properties.metadata_compression_codec,
308 CompressionCodec::Gzip
309 );
310 }
311
312 #[test]
313 fn test_table_properties_compression_none() {
314 let props = HashMap::from([(
315 TableProperties::PROPERTY_METADATA_COMPRESSION_CODEC.to_string(),
316 "none".to_string(),
317 )]);
318 let table_properties = TableProperties::try_from(&props).unwrap();
319 assert_eq!(
320 table_properties.metadata_compression_codec,
321 CompressionCodec::None
322 );
323 }
324
325 #[test]
326 fn test_table_properties_compression_case_insensitive() {
327 let props_upper = HashMap::from([(
329 TableProperties::PROPERTY_METADATA_COMPRESSION_CODEC.to_string(),
330 "GZIP".to_string(),
331 )]);
332 let table_properties = TableProperties::try_from(&props_upper).unwrap();
333 assert_eq!(
334 table_properties.metadata_compression_codec,
335 CompressionCodec::Gzip
336 );
337
338 let props_mixed = HashMap::from([(
340 TableProperties::PROPERTY_METADATA_COMPRESSION_CODEC.to_string(),
341 "GzIp".to_string(),
342 )]);
343 let table_properties = TableProperties::try_from(&props_mixed).unwrap();
344 assert_eq!(
345 table_properties.metadata_compression_codec,
346 CompressionCodec::Gzip
347 );
348
349 let props_none_upper = HashMap::from([(
351 TableProperties::PROPERTY_METADATA_COMPRESSION_CODEC.to_string(),
352 "NONE".to_string(),
353 )]);
354 let table_properties = TableProperties::try_from(&props_none_upper).unwrap();
355 assert_eq!(
356 table_properties.metadata_compression_codec,
357 CompressionCodec::None
358 );
359 }
360
361 #[test]
362 fn test_table_properties_valid() {
363 let props = HashMap::from([
364 (
365 TableProperties::PROPERTY_COMMIT_NUM_RETRIES.to_string(),
366 "10".to_string(),
367 ),
368 (
369 TableProperties::PROPERTY_COMMIT_MAX_RETRY_WAIT_MS.to_string(),
370 "20".to_string(),
371 ),
372 (
373 TableProperties::PROPERTY_DEFAULT_FILE_FORMAT.to_string(),
374 "avro".to_string(),
375 ),
376 (
377 TableProperties::PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES.to_string(),
378 "512".to_string(),
379 ),
380 ]);
381 let table_properties = TableProperties::try_from(&props).unwrap();
382 assert_eq!(table_properties.commit_num_retries, 10);
383 assert_eq!(table_properties.commit_max_retry_wait_ms, 20);
384 assert_eq!(table_properties.write_format_default, "avro".to_string());
385 assert_eq!(table_properties.write_target_file_size_bytes, 512);
386 }
387
388 #[test]
389 fn test_table_properties_invalid() {
390 let invalid_retries = HashMap::from([(
391 TableProperties::PROPERTY_COMMIT_NUM_RETRIES.to_string(),
392 "abc".to_string(),
393 )]);
394
395 let table_properties = TableProperties::try_from(&invalid_retries).unwrap_err();
396 assert!(
397 table_properties.to_string().contains(
398 "Invalid value for commit.retry.num-retries: invalid digit found in string"
399 )
400 );
401
402 let invalid_min_wait = HashMap::from([(
403 TableProperties::PROPERTY_COMMIT_MIN_RETRY_WAIT_MS.to_string(),
404 "abc".to_string(),
405 )]);
406 let table_properties = TableProperties::try_from(&invalid_min_wait).unwrap_err();
407 assert!(
408 table_properties.to_string().contains(
409 "Invalid value for commit.retry.min-wait-ms: invalid digit found in string"
410 )
411 );
412
413 let invalid_max_wait = HashMap::from([(
414 TableProperties::PROPERTY_COMMIT_MAX_RETRY_WAIT_MS.to_string(),
415 "abc".to_string(),
416 )]);
417 let table_properties = TableProperties::try_from(&invalid_max_wait).unwrap_err();
418 assert!(
419 table_properties.to_string().contains(
420 "Invalid value for commit.retry.max-wait-ms: invalid digit found in string"
421 )
422 );
423
424 let invalid_target_size = HashMap::from([(
425 TableProperties::PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES.to_string(),
426 "abc".to_string(),
427 )]);
428 let table_properties = TableProperties::try_from(&invalid_target_size).unwrap_err();
429 assert!(table_properties.to_string().contains(
430 "Invalid value for write.target-file-size-bytes: invalid digit found in string"
431 ));
432 }
433
434 #[test]
435 fn test_table_properties_compression_invalid_rejected() {
436 let invalid_codecs = ["lz4", "zstd", "snappy"];
437
438 for codec in invalid_codecs {
439 let props = HashMap::from([(
440 TableProperties::PROPERTY_METADATA_COMPRESSION_CODEC.to_string(),
441 codec.to_string(),
442 )]);
443 let err = TableProperties::try_from(&props).unwrap_err();
444 let err_msg = err.to_string();
445 assert!(
446 err_msg.contains(&format!("Invalid metadata compression codec: {codec}")),
447 "Expected error message to contain codec '{codec}', got: {err_msg}"
448 );
449 assert!(
450 err_msg.contains("Only 'none' and 'gzip' are supported"),
451 "Expected error message to contain supported codecs, got: {err_msg}"
452 );
453 }
454 }
455
456 #[test]
457 fn test_parse_metadata_file_compression_valid() {
458 let props = HashMap::from([(
460 TableProperties::PROPERTY_METADATA_COMPRESSION_CODEC.to_string(),
461 "none".to_string(),
462 )]);
463 assert_eq!(
464 parse_metadata_file_compression(&props).unwrap(),
465 CompressionCodec::None
466 );
467
468 let props = HashMap::from([(
470 TableProperties::PROPERTY_METADATA_COMPRESSION_CODEC.to_string(),
471 "".to_string(),
472 )]);
473 assert_eq!(
474 parse_metadata_file_compression(&props).unwrap(),
475 CompressionCodec::None
476 );
477
478 let props = HashMap::from([(
480 TableProperties::PROPERTY_METADATA_COMPRESSION_CODEC.to_string(),
481 "gzip".to_string(),
482 )]);
483 assert_eq!(
484 parse_metadata_file_compression(&props).unwrap(),
485 CompressionCodec::Gzip
486 );
487
488 let props = HashMap::from([(
490 TableProperties::PROPERTY_METADATA_COMPRESSION_CODEC.to_string(),
491 "NONE".to_string(),
492 )]);
493 assert_eq!(
494 parse_metadata_file_compression(&props).unwrap(),
495 CompressionCodec::None
496 );
497
498 let props = HashMap::from([(
500 TableProperties::PROPERTY_METADATA_COMPRESSION_CODEC.to_string(),
501 "GZIP".to_string(),
502 )]);
503 assert_eq!(
504 parse_metadata_file_compression(&props).unwrap(),
505 CompressionCodec::Gzip
506 );
507
508 let props = HashMap::from([(
510 TableProperties::PROPERTY_METADATA_COMPRESSION_CODEC.to_string(),
511 "GzIp".to_string(),
512 )]);
513 assert_eq!(
514 parse_metadata_file_compression(&props).unwrap(),
515 CompressionCodec::Gzip
516 );
517
518 let props = HashMap::new();
520 assert_eq!(
521 parse_metadata_file_compression(&props).unwrap(),
522 CompressionCodec::None
523 );
524 }
525
526 #[test]
527 fn test_parse_metadata_file_compression_invalid() {
528 let invalid_codecs = ["lz4", "zstd", "snappy"];
529
530 for codec in invalid_codecs {
531 let props = HashMap::from([(
532 TableProperties::PROPERTY_METADATA_COMPRESSION_CODEC.to_string(),
533 codec.to_string(),
534 )]);
535 let err = parse_metadata_file_compression(&props).unwrap_err();
536 let err_msg = err.to_string();
537 assert!(
538 err_msg.contains("Invalid metadata compression codec"),
539 "Expected error message to contain 'Invalid metadata compression codec', got: {err_msg}"
540 );
541 assert!(
542 err_msg.contains("Only 'none' and 'gzip' are supported"),
543 "Expected error message to contain supported codecs, got: {err_msg}"
544 );
545 }
546 }
547}