1use std::collections::HashMap;
19
20use itertools::Itertools;
21
22use super::{DataContentType, DataFile, PartitionSpecRef};
23use crate::spec::{ManifestContentType, ManifestFile, Operation, SchemaRef, Summary};
24use crate::{Error, ErrorKind, Result};
25
26const ADDED_DATA_FILES: &str = "added-data-files";
27const ADDED_DELETE_FILES: &str = "added-delete-files";
28const ADDED_EQUALITY_DELETES: &str = "added-equality-deletes";
29const ADDED_FILE_SIZE: &str = "added-files-size";
30const ADDED_POSITION_DELETES: &str = "added-position-deletes";
31const ADDED_POSITION_DELETE_FILES: &str = "added-position-delete-files";
32const ADDED_RECORDS: &str = "added-records";
33const DELETED_DATA_FILES: &str = "deleted-data-files";
34const DELETED_RECORDS: &str = "deleted-records";
35const ADDED_EQUALITY_DELETE_FILES: &str = "added-equality-delete-files";
36const REMOVED_DELETE_FILES: &str = "removed-delete-files";
37const REMOVED_EQUALITY_DELETES: &str = "removed-equality-deletes";
38const REMOVED_EQUALITY_DELETE_FILES: &str = "removed-equality-delete-files";
39const REMOVED_FILE_SIZE: &str = "removed-files-size";
40const REMOVED_POSITION_DELETES: &str = "removed-position-deletes";
41const REMOVED_POSITION_DELETE_FILES: &str = "removed-position-delete-files";
42const TOTAL_EQUALITY_DELETES: &str = "total-equality-deletes";
43const TOTAL_POSITION_DELETES: &str = "total-position-deletes";
44const TOTAL_DATA_FILES: &str = "total-data-files";
45const TOTAL_DELETE_FILES: &str = "total-delete-files";
46const TOTAL_RECORDS: &str = "total-records";
47const TOTAL_FILE_SIZE: &str = "total-files-size";
48const CHANGED_PARTITION_COUNT_PROP: &str = "changed-partition-count";
49const CHANGED_PARTITION_PREFIX: &str = "partitions.";
50
51#[derive(Default)]
55pub struct SnapshotSummaryCollector {
56 metrics: UpdateMetrics,
57 partition_metrics: HashMap<String, UpdateMetrics>,
58 max_changed_partitions_for_summaries: u64,
59 properties: HashMap<String, String>,
60 trust_partition_metrics: bool,
61}
62
63impl SnapshotSummaryCollector {
64 pub fn set(&mut self, key: &str, value: &str) {
66 self.properties.insert(key.to_string(), value.to_string());
67 }
68
69 pub fn set_partition_summary_limit(&mut self, limit: u64) {
72 self.max_changed_partitions_for_summaries = limit;
73 }
74
75 pub fn add_file(
77 &mut self,
78 data_file: &DataFile,
79 schema: SchemaRef,
80 partition_spec: PartitionSpecRef,
81 ) {
82 self.metrics.add_file(data_file);
83 if !data_file.partition.fields().is_empty() {
84 self.update_partition_metrics(schema, partition_spec, data_file, true);
85 }
86 }
87
88 pub fn remove_file(
90 &mut self,
91 data_file: &DataFile,
92 schema: SchemaRef,
93 partition_spec: PartitionSpecRef,
94 ) {
95 self.metrics.remove_file(data_file);
96 if !data_file.partition.fields().is_empty() {
97 self.update_partition_metrics(schema, partition_spec, data_file, false);
98 }
99 }
100
101 pub fn add_manifest(&mut self, manifest: &ManifestFile) {
103 self.trust_partition_metrics = false;
104 self.partition_metrics.clear();
105 self.metrics.add_manifest(manifest);
106 }
107
108 pub fn update_partition_metrics(
110 &mut self,
111 schema: SchemaRef,
112 partition_spec: PartitionSpecRef,
113 data_file: &DataFile,
114 is_add_file: bool,
115 ) {
116 let partition_path = partition_spec.partition_to_path(&data_file.partition, schema);
117 let metrics = self.partition_metrics.entry(partition_path).or_default();
118
119 if is_add_file {
120 metrics.add_file(data_file);
121 } else {
122 metrics.remove_file(data_file);
123 }
124 }
125
126 pub fn merge(&mut self, summary: SnapshotSummaryCollector) {
128 self.metrics.merge(&summary.metrics);
129 self.properties.extend(summary.properties);
130
131 if self.trust_partition_metrics && summary.trust_partition_metrics {
132 for (partition, partition_metric) in summary.partition_metrics.iter() {
133 self.partition_metrics
134 .entry(partition.to_string())
135 .or_default()
136 .merge(partition_metric);
137 }
138 } else {
139 self.partition_metrics.clear();
140 self.trust_partition_metrics = false;
141 }
142 }
143
144 pub fn build(&self) -> HashMap<String, String> {
146 let mut properties = self.metrics.to_map();
147 let changed_partitions_count = self.partition_metrics.len() as u64;
148 set_if_positive(
149 &mut properties,
150 changed_partitions_count,
151 CHANGED_PARTITION_COUNT_PROP,
152 );
153
154 if changed_partitions_count <= self.max_changed_partitions_for_summaries {
155 for (partition_path, update_metrics_partition) in &self.partition_metrics {
156 let property_key = format!("{CHANGED_PARTITION_PREFIX}{partition_path}");
157 let partition_summary = update_metrics_partition
158 .to_map()
159 .into_iter()
160 .map(|(property, value)| format!("{property}={value}"))
161 .join(",");
162
163 if !partition_summary.is_empty() {
164 properties.insert(property_key, partition_summary);
165 }
166 }
167 }
168 properties
169 }
170}
171
172#[derive(Debug, Default)]
173struct UpdateMetrics {
174 added_file_size: u64,
175 removed_file_size: u64,
176 added_data_files: u32,
177 removed_data_files: u32,
178 added_eq_delete_files: u64,
179 removed_eq_delete_files: u64,
180 added_pos_delete_files: u64,
181 removed_pos_delete_files: u64,
182 added_delete_files: u32,
183 removed_delete_files: u32,
184 added_records: u64,
185 deleted_records: u64,
186 added_pos_deletes: u64,
187 removed_pos_deletes: u64,
188 added_eq_deletes: u64,
189 removed_eq_deletes: u64,
190}
191
192impl UpdateMetrics {
193 fn add_file(&mut self, data_file: &DataFile) {
194 self.added_file_size += data_file.file_size_in_bytes;
195 match data_file.content_type() {
196 DataContentType::Data => {
197 self.added_data_files += 1;
198 self.added_records += data_file.record_count;
199 }
200 DataContentType::PositionDeletes => {
201 self.added_delete_files += 1;
202 self.added_pos_delete_files += 1;
203 self.added_pos_deletes += data_file.record_count;
204 }
205 DataContentType::EqualityDeletes => {
206 self.added_delete_files += 1;
207 self.added_eq_delete_files += 1;
208 self.added_eq_deletes += data_file.record_count;
209 }
210 }
211 }
212
213 fn remove_file(&mut self, data_file: &DataFile) {
214 self.removed_file_size += data_file.file_size_in_bytes;
215 match data_file.content_type() {
216 DataContentType::Data => {
217 self.removed_data_files += 1;
218 self.deleted_records += data_file.record_count;
219 }
220 DataContentType::PositionDeletes => {
221 self.removed_delete_files += 1;
222 self.removed_pos_delete_files += 1;
223 self.removed_pos_deletes += data_file.record_count;
224 }
225 DataContentType::EqualityDeletes => {
226 self.removed_delete_files += 1;
227 self.removed_eq_delete_files += 1;
228 self.removed_eq_deletes += data_file.record_count;
229 }
230 }
231 }
232
233 fn add_manifest(&mut self, manifest: &ManifestFile) {
234 match manifest.content {
235 ManifestContentType::Data => {
236 self.added_data_files += manifest.added_files_count.unwrap_or(0);
237 self.added_records += manifest.added_rows_count.unwrap_or(0);
238 self.removed_data_files += manifest.deleted_files_count.unwrap_or(0);
239 self.deleted_records += manifest.deleted_rows_count.unwrap_or(0);
240 }
241 ManifestContentType::Deletes => {
242 self.added_delete_files += manifest.added_files_count.unwrap_or(0);
243 self.removed_delete_files += manifest.deleted_files_count.unwrap_or(0);
244 }
245 }
246 }
247
248 fn to_map(&self) -> HashMap<String, String> {
249 let mut properties = HashMap::new();
250 set_if_positive(&mut properties, self.added_file_size, ADDED_FILE_SIZE);
251 set_if_positive(&mut properties, self.removed_file_size, REMOVED_FILE_SIZE);
252 set_if_positive(&mut properties, self.added_data_files, ADDED_DATA_FILES);
253 set_if_positive(&mut properties, self.removed_data_files, DELETED_DATA_FILES);
254 set_if_positive(
255 &mut properties,
256 self.added_eq_delete_files,
257 ADDED_EQUALITY_DELETE_FILES,
258 );
259 set_if_positive(
260 &mut properties,
261 self.removed_eq_delete_files,
262 REMOVED_EQUALITY_DELETE_FILES,
263 );
264 set_if_positive(
265 &mut properties,
266 self.added_pos_delete_files,
267 ADDED_POSITION_DELETE_FILES,
268 );
269 set_if_positive(
270 &mut properties,
271 self.removed_pos_delete_files,
272 REMOVED_POSITION_DELETE_FILES,
273 );
274 set_if_positive(&mut properties, self.added_delete_files, ADDED_DELETE_FILES);
275 set_if_positive(
276 &mut properties,
277 self.removed_delete_files,
278 REMOVED_DELETE_FILES,
279 );
280 set_if_positive(&mut properties, self.added_records, ADDED_RECORDS);
281 set_if_positive(&mut properties, self.deleted_records, DELETED_RECORDS);
282 set_if_positive(
283 &mut properties,
284 self.added_pos_deletes,
285 ADDED_POSITION_DELETES,
286 );
287 set_if_positive(
288 &mut properties,
289 self.removed_pos_deletes,
290 REMOVED_POSITION_DELETES,
291 );
292 set_if_positive(
293 &mut properties,
294 self.added_eq_deletes,
295 ADDED_EQUALITY_DELETES,
296 );
297 set_if_positive(
298 &mut properties,
299 self.removed_eq_deletes,
300 REMOVED_EQUALITY_DELETES,
301 );
302 properties
303 }
304
305 fn merge(&mut self, other: &UpdateMetrics) {
306 self.added_file_size += other.added_file_size;
307 self.removed_file_size += other.removed_file_size;
308 self.added_data_files += other.added_data_files;
309 self.removed_data_files += other.removed_data_files;
310 self.added_eq_delete_files += other.added_eq_delete_files;
311 self.removed_eq_delete_files += other.removed_eq_delete_files;
312 self.added_pos_delete_files += other.added_pos_delete_files;
313 self.removed_pos_delete_files += other.removed_pos_delete_files;
314 self.added_delete_files += other.added_delete_files;
315 self.removed_delete_files += other.removed_delete_files;
316 self.added_records += other.added_records;
317 self.deleted_records += other.deleted_records;
318 self.added_pos_deletes += other.added_pos_deletes;
319 self.removed_pos_deletes += other.removed_pos_deletes;
320 self.added_eq_deletes += other.added_eq_deletes;
321 self.removed_eq_deletes += other.removed_eq_deletes;
322 }
323}
324
325fn set_if_positive<T>(properties: &mut HashMap<String, String>, value: T, property_name: &str)
326where T: PartialOrd + Default + ToString {
327 if value > T::default() {
328 properties.insert(property_name.to_string(), value.to_string());
329 }
330}
331
332pub(crate) fn update_snapshot_summaries(
333 summary: Summary,
334 previous_summary: Option<&Summary>,
335 truncate_full_table: bool,
336) -> Result<Summary> {
337 if summary.operation != Operation::Append
339 && summary.operation != Operation::Overwrite
340 && summary.operation != Operation::Delete
341 {
342 return Err(Error::new(
343 ErrorKind::DataInvalid,
344 "Operation is not supported.",
345 ));
346 }
347
348 let mut summary = match previous_summary {
349 Some(prev_summary) if truncate_full_table && summary.operation == Operation::Overwrite => {
350 truncate_table_summary(summary, prev_summary).map_err(|err| {
351 Error::new(ErrorKind::Unexpected, "Failed to truncate table summary.")
352 .with_source(err)
353 })?
354 }
355 _ => summary,
356 };
357
358 update_totals(
359 &mut summary,
360 previous_summary,
361 TOTAL_DATA_FILES,
362 ADDED_DATA_FILES,
363 DELETED_DATA_FILES,
364 );
365
366 update_totals(
367 &mut summary,
368 previous_summary,
369 TOTAL_DELETE_FILES,
370 ADDED_DELETE_FILES,
371 REMOVED_DELETE_FILES,
372 );
373
374 update_totals(
375 &mut summary,
376 previous_summary,
377 TOTAL_RECORDS,
378 ADDED_RECORDS,
379 DELETED_RECORDS,
380 );
381
382 update_totals(
383 &mut summary,
384 previous_summary,
385 TOTAL_FILE_SIZE,
386 ADDED_FILE_SIZE,
387 REMOVED_FILE_SIZE,
388 );
389
390 update_totals(
391 &mut summary,
392 previous_summary,
393 TOTAL_POSITION_DELETES,
394 ADDED_POSITION_DELETES,
395 REMOVED_POSITION_DELETES,
396 );
397
398 update_totals(
399 &mut summary,
400 previous_summary,
401 TOTAL_EQUALITY_DELETES,
402 ADDED_EQUALITY_DELETES,
403 REMOVED_EQUALITY_DELETES,
404 );
405 Ok(summary)
406}
407
408fn get_prop(previous_summary: &Summary, prop: &str) -> Result<u64> {
409 let value_str = previous_summary
410 .additional_properties
411 .get(prop)
412 .map(String::as_str)
413 .unwrap_or("0");
414 value_str.parse::<u64>().map_err(|err| {
415 Error::new(
416 ErrorKind::Unexpected,
417 format!("Failed to parse summary property '{prop}' value '{value_str}' as u64."),
418 )
419 .with_source(err)
420 })
421}
422
423fn truncate_table_summary(mut summary: Summary, previous_summary: &Summary) -> Result<Summary> {
424 for prop in [
425 TOTAL_DATA_FILES,
426 TOTAL_DELETE_FILES,
427 TOTAL_RECORDS,
428 TOTAL_FILE_SIZE,
429 TOTAL_POSITION_DELETES,
430 TOTAL_EQUALITY_DELETES,
431 ] {
432 summary
433 .additional_properties
434 .insert(prop.to_string(), "0".to_string());
435 }
436
437 let value = get_prop(previous_summary, TOTAL_DATA_FILES)?;
438 if value != 0 {
439 summary
440 .additional_properties
441 .insert(DELETED_DATA_FILES.to_string(), value.to_string());
442 }
443 let value = get_prop(previous_summary, TOTAL_DELETE_FILES)?;
444 if value != 0 {
445 summary
446 .additional_properties
447 .insert(REMOVED_DELETE_FILES.to_string(), value.to_string());
448 }
449 let value = get_prop(previous_summary, TOTAL_RECORDS)?;
450 if value != 0 {
451 summary
452 .additional_properties
453 .insert(DELETED_RECORDS.to_string(), value.to_string());
454 }
455 let value = get_prop(previous_summary, TOTAL_FILE_SIZE)?;
456 if value != 0 {
457 summary
458 .additional_properties
459 .insert(REMOVED_FILE_SIZE.to_string(), value.to_string());
460 }
461
462 let value = get_prop(previous_summary, TOTAL_POSITION_DELETES)?;
463 if value != 0 {
464 summary
465 .additional_properties
466 .insert(REMOVED_POSITION_DELETES.to_string(), value.to_string());
467 }
468
469 let value = get_prop(previous_summary, TOTAL_EQUALITY_DELETES)?;
470 if value != 0 {
471 summary
472 .additional_properties
473 .insert(REMOVED_EQUALITY_DELETES.to_string(), value.to_string());
474 }
475
476 Ok(summary)
477}
478
479fn update_totals(
480 summary: &mut Summary,
481 previous_summary: Option<&Summary>,
482 total_property: &str,
483 added_property: &str,
484 removed_property: &str,
485) {
486 let previous_total = match previous_summary {
487 None => 0,
488 Some(prev_summary) => match prev_summary.additional_properties.get(total_property) {
489 Some(value_str) => match value_str.parse::<u64>() {
490 Ok(v) => v,
491 Err(parse_err) => {
492 tracing::warn!(
493 "Property '{total_property}' could not be parsed in the previous snapshot summary: {parse_err}. \
494 Skipping total computation.",
495 );
496 return;
497 }
498 },
499 None => {
500 tracing::debug!(
501 "Property '{total_property}' was not set in the previous snapshot summary. \
502 Skipping total computation."
503 );
504 return;
505 }
506 },
507 };
508
509 let parse_delta = |property: &str| -> Option<u64> {
515 match summary.additional_properties.get(property) {
516 None => Some(0),
517 Some(value) => match value.parse::<u64>() {
518 Ok(v) => Some(v),
519 Err(parse_err) => {
520 tracing::warn!(
521 "Property '{property}' could not be parsed when computing '{total_property}': {parse_err}. \
522 Skipping total computation.",
523 );
524 None
525 }
526 },
527 }
528 };
529
530 let (Some(added), Some(removed)) = (parse_delta(added_property), parse_delta(removed_property))
531 else {
532 return;
533 };
534
535 let new_total = previous_total + added - removed;
536 summary
537 .additional_properties
538 .insert(total_property.to_string(), new_total.to_string());
539}
540
541#[cfg(test)]
542mod tests {
543 use std::collections::HashMap;
544 use std::sync::Arc;
545
546 use super::*;
547 use crate::spec::{
548 DataFileFormat, Datum, Literal, NestedField, PartitionSpec, PrimitiveType, Schema, Struct,
549 Transform, Type, UnboundPartitionField,
550 };
551
552 #[test]
553 fn test_update_snapshot_summaries_append() {
554 let prev_props: HashMap<String, String> = [
555 (TOTAL_DATA_FILES.to_string(), "10".to_string()),
556 (TOTAL_DELETE_FILES.to_string(), "5".to_string()),
557 (TOTAL_RECORDS.to_string(), "100".to_string()),
558 (TOTAL_FILE_SIZE.to_string(), "1000".to_string()),
559 (TOTAL_POSITION_DELETES.to_string(), "3".to_string()),
560 (TOTAL_EQUALITY_DELETES.to_string(), "2".to_string()),
561 ]
562 .into_iter()
563 .collect();
564
565 let previous_summary = Summary {
566 operation: Operation::Append,
567 additional_properties: prev_props,
568 };
569
570 let new_props: HashMap<String, String> = [
571 (ADDED_DATA_FILES.to_string(), "4".to_string()),
572 (DELETED_DATA_FILES.to_string(), "1".to_string()),
573 (ADDED_DELETE_FILES.to_string(), "2".to_string()),
574 (REMOVED_DELETE_FILES.to_string(), "1".to_string()),
575 (ADDED_RECORDS.to_string(), "40".to_string()),
576 (DELETED_RECORDS.to_string(), "10".to_string()),
577 (ADDED_FILE_SIZE.to_string(), "400".to_string()),
578 (REMOVED_FILE_SIZE.to_string(), "100".to_string()),
579 (ADDED_POSITION_DELETES.to_string(), "5".to_string()),
580 (REMOVED_POSITION_DELETES.to_string(), "2".to_string()),
581 (ADDED_EQUALITY_DELETES.to_string(), "3".to_string()),
582 (REMOVED_EQUALITY_DELETES.to_string(), "1".to_string()),
583 ]
584 .into_iter()
585 .collect();
586
587 let summary = Summary {
588 operation: Operation::Append,
589 additional_properties: new_props,
590 };
591
592 let updated = update_snapshot_summaries(summary, Some(&previous_summary), false).unwrap();
593
594 assert_eq!(
595 updated.additional_properties.get(TOTAL_DATA_FILES).unwrap(),
596 "13"
597 );
598 assert_eq!(
599 updated
600 .additional_properties
601 .get(TOTAL_DELETE_FILES)
602 .unwrap(),
603 "6"
604 );
605 assert_eq!(
606 updated.additional_properties.get(TOTAL_RECORDS).unwrap(),
607 "130"
608 );
609 assert_eq!(
610 updated.additional_properties.get(TOTAL_FILE_SIZE).unwrap(),
611 "1300"
612 );
613 assert_eq!(
614 updated
615 .additional_properties
616 .get(TOTAL_POSITION_DELETES)
617 .unwrap(),
618 "6"
619 );
620 assert_eq!(
621 updated
622 .additional_properties
623 .get(TOTAL_EQUALITY_DELETES)
624 .unwrap(),
625 "4"
626 );
627 }
628
629 #[test]
630 fn test_truncate_table_summary() {
631 let prev_props: HashMap<String, String> = [
632 (TOTAL_DATA_FILES.to_string(), "10".to_string()),
633 (TOTAL_DELETE_FILES.to_string(), "5".to_string()),
634 (TOTAL_RECORDS.to_string(), "100".to_string()),
635 (TOTAL_FILE_SIZE.to_string(), "1000".to_string()),
636 (TOTAL_POSITION_DELETES.to_string(), "3".to_string()),
637 (TOTAL_EQUALITY_DELETES.to_string(), "2".to_string()),
638 ]
639 .into_iter()
640 .collect();
641
642 let previous_summary = Summary {
643 operation: Operation::Overwrite,
644 additional_properties: prev_props,
645 };
646
647 let mut new_props = HashMap::new();
648 new_props.insert("dummy".to_string(), "value".to_string());
649 let summary = Summary {
650 operation: Operation::Overwrite,
651 additional_properties: new_props,
652 };
653
654 let truncated = truncate_table_summary(summary, &previous_summary).unwrap();
655
656 assert_eq!(
657 truncated
658 .additional_properties
659 .get(TOTAL_DATA_FILES)
660 .unwrap(),
661 "0"
662 );
663 assert_eq!(
664 truncated
665 .additional_properties
666 .get(TOTAL_DELETE_FILES)
667 .unwrap(),
668 "0"
669 );
670 assert_eq!(
671 truncated.additional_properties.get(TOTAL_RECORDS).unwrap(),
672 "0"
673 );
674 assert_eq!(
675 truncated
676 .additional_properties
677 .get(TOTAL_FILE_SIZE)
678 .unwrap(),
679 "0"
680 );
681 assert_eq!(
682 truncated
683 .additional_properties
684 .get(TOTAL_POSITION_DELETES)
685 .unwrap(),
686 "0"
687 );
688 assert_eq!(
689 truncated
690 .additional_properties
691 .get(TOTAL_EQUALITY_DELETES)
692 .unwrap(),
693 "0"
694 );
695
696 assert_eq!(
697 truncated
698 .additional_properties
699 .get(DELETED_DATA_FILES)
700 .unwrap(),
701 "10"
702 );
703 assert_eq!(
704 truncated
705 .additional_properties
706 .get(REMOVED_DELETE_FILES)
707 .unwrap(),
708 "5"
709 );
710 assert_eq!(
711 truncated
712 .additional_properties
713 .get(DELETED_RECORDS)
714 .unwrap(),
715 "100"
716 );
717 assert_eq!(
718 truncated
719 .additional_properties
720 .get(REMOVED_FILE_SIZE)
721 .unwrap(),
722 "1000"
723 );
724 assert_eq!(
725 truncated
726 .additional_properties
727 .get(REMOVED_POSITION_DELETES)
728 .unwrap(),
729 "3"
730 );
731 assert_eq!(
732 truncated
733 .additional_properties
734 .get(REMOVED_EQUALITY_DELETES)
735 .unwrap(),
736 "2"
737 );
738 }
739
740 #[test]
741 fn test_update_snapshot_summaries_overwrite_truncate_handles_totals_above_i32_max() {
742 let big = (i32::MAX as u64 + 1).to_string(); let prev_props: HashMap<String, String> = [
747 (TOTAL_DATA_FILES.to_string(), big.clone()),
748 (TOTAL_DELETE_FILES.to_string(), "0".to_string()),
749 (TOTAL_RECORDS.to_string(), big.clone()),
750 (TOTAL_FILE_SIZE.to_string(), "0".to_string()),
751 (TOTAL_POSITION_DELETES.to_string(), "0".to_string()),
752 (TOTAL_EQUALITY_DELETES.to_string(), "0".to_string()),
753 ]
754 .into_iter()
755 .collect();
756
757 let previous_summary = Summary {
758 operation: Operation::Overwrite,
759 additional_properties: prev_props,
760 };
761
762 let summary = Summary {
763 operation: Operation::Overwrite,
764 additional_properties: HashMap::new(),
765 };
766
767 let updated = update_snapshot_summaries(summary, Some(&previous_summary), true)
768 .expect("overwrite truncation should accept totals above i32::MAX");
769 assert_eq!(
770 updated
771 .additional_properties
772 .get(DELETED_DATA_FILES)
773 .unwrap(),
774 &big
775 );
776 assert_eq!(
777 updated.additional_properties.get(DELETED_RECORDS).unwrap(),
778 &big
779 );
780 }
781
782 #[test]
783 fn test_update_snapshot_summaries_overwrite_truncate_returns_err_on_malformed_total() {
784 let prev_props: HashMap<String, String> = [
788 (TOTAL_DATA_FILES.to_string(), "not_a_number".to_string()),
789 (TOTAL_DELETE_FILES.to_string(), "0".to_string()),
790 (TOTAL_RECORDS.to_string(), "0".to_string()),
791 (TOTAL_FILE_SIZE.to_string(), "0".to_string()),
792 (TOTAL_POSITION_DELETES.to_string(), "0".to_string()),
793 (TOTAL_EQUALITY_DELETES.to_string(), "0".to_string()),
794 ]
795 .into_iter()
796 .collect();
797
798 let previous_summary = Summary {
799 operation: Operation::Overwrite,
800 additional_properties: prev_props,
801 };
802
803 let summary = Summary {
804 operation: Operation::Overwrite,
805 additional_properties: HashMap::new(),
806 };
807
808 let err = update_snapshot_summaries(summary, Some(&previous_summary), true)
809 .expect_err("malformed previous summary must produce an Err, not a panic");
810 assert!(
811 err.message().contains("truncate table summary"),
812 "expected wrapped 'Failed to truncate table summary' context, got: {}",
813 err.message()
814 );
815 }
816
817 #[test]
818 fn test_snapshot_summary_collector_build() {
819 let schema = Arc::new(
820 Schema::builder()
821 .with_fields(vec![
822 NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
823 NestedField::required(2, "name", Type::Primitive(PrimitiveType::String)).into(),
824 ])
825 .build()
826 .unwrap(),
827 );
828
829 let partition_spec = Arc::new(
830 PartitionSpec::builder(schema.clone())
831 .add_unbound_fields(vec![
832 UnboundPartitionField::builder()
833 .source_id(2)
834 .name("year".to_string())
835 .transform(Transform::Identity)
836 .build(),
837 ])
838 .unwrap()
839 .with_spec_id(1)
840 .build()
841 .unwrap(),
842 );
843
844 let mut collector = SnapshotSummaryCollector::default();
845 collector.set_partition_summary_limit(10);
846
847 let file1 = DataFile {
848 content: DataContentType::Data,
849 file_path: "s3://testbucket/path/to/file1.parquet".to_string(),
850 file_format: DataFileFormat::Parquet,
851 partition: Struct::from_iter(vec![]),
852 record_count: 10,
853 file_size_in_bytes: 100,
854 column_sizes: HashMap::from([(1, 46), (2, 48), (3, 48)]),
855 value_counts: HashMap::from([(1, 10), (2, 10), (3, 10)]),
856 null_value_counts: HashMap::from([(1, 0), (2, 0), (3, 0)]),
857 nan_value_counts: HashMap::new(),
858 lower_bounds: HashMap::from([
859 (1, Datum::long(1)),
860 (2, Datum::string("a")),
861 (3, Datum::string("x")),
862 ]),
863 upper_bounds: HashMap::from([
864 (1, Datum::long(1)),
865 (2, Datum::string("a")),
866 (3, Datum::string("x")),
867 ]),
868 key_metadata: None,
869 split_offsets: Some(vec![4]),
870 equality_ids: None,
871 sort_order_id: Some(0),
872 partition_spec_id: 0,
873 first_row_id: None,
874 referenced_data_file: None,
875 content_offset: None,
876 content_size_in_bytes: None,
877 };
878
879 let file2 = DataFile {
880 content: DataContentType::Data,
881 file_path: "s3://testbucket/path/to/file2.parquet".to_string(),
882 file_format: DataFileFormat::Parquet,
883 partition: Struct::from_iter(vec![Some(Literal::string("2025"))]),
884 record_count: 20,
885 file_size_in_bytes: 200,
886 column_sizes: HashMap::from([(1, 46), (2, 48), (3, 48)]),
887 value_counts: HashMap::from([(1, 20), (2, 20), (3, 20)]),
888 null_value_counts: HashMap::from([(1, 0), (2, 0), (3, 0)]),
889 nan_value_counts: HashMap::new(),
890 lower_bounds: HashMap::from([
891 (1, Datum::long(1)),
892 (2, Datum::string("a")),
893 (3, Datum::string("x")),
894 ]),
895 upper_bounds: HashMap::from([
896 (1, Datum::long(1)),
897 (2, Datum::string("a")),
898 (3, Datum::string("x")),
899 ]),
900 key_metadata: None,
901 split_offsets: Some(vec![4]),
902 equality_ids: None,
903 sort_order_id: Some(0),
904 partition_spec_id: 0,
905 first_row_id: None,
906 referenced_data_file: None,
907 content_offset: None,
908 content_size_in_bytes: None,
909 };
910
911 collector.add_file(&file1, schema.clone(), partition_spec.clone());
912 collector.add_file(&file2, schema.clone(), partition_spec.clone());
913
914 collector.remove_file(&file1, schema.clone(), partition_spec.clone());
915
916 let props = collector.build();
917
918 assert_eq!(props.get(ADDED_FILE_SIZE).unwrap(), "300");
919 assert_eq!(props.get(REMOVED_FILE_SIZE).unwrap(), "100");
920
921 let partition_key = format!("{}{}", CHANGED_PARTITION_PREFIX, "year=2025");
922
923 assert!(props.contains_key(&partition_key));
924
925 let partition_summary = props.get(&partition_key).unwrap();
926 assert!(partition_summary.contains(&format!("{ADDED_FILE_SIZE}=200")));
927 assert!(partition_summary.contains(&format!("{ADDED_DATA_FILES}=1")));
928 assert!(partition_summary.contains(&format!("{ADDED_RECORDS}=20")));
929 }
930
931 #[test]
932 fn test_snapshot_summary_collector_add_manifest() {
933 let mut collector = SnapshotSummaryCollector::default();
934 collector.set_partition_summary_limit(10);
935
936 let manifest = ManifestFile {
937 manifest_path: "file://dummy.manifest".to_string(),
938 manifest_length: 0,
939 partition_spec_id: 0,
940 content: ManifestContentType::Data,
941 sequence_number: 0,
942 min_sequence_number: 0,
943 added_snapshot_id: 0,
944 added_files_count: Some(3),
945 existing_files_count: Some(0),
946 deleted_files_count: Some(1),
947 added_rows_count: Some(100),
948 existing_rows_count: Some(0),
949 deleted_rows_count: Some(50),
950 partitions: Some(Vec::new()),
951 key_metadata: None,
952 first_row_id: None,
953 };
954
955 collector
956 .partition_metrics
957 .insert("dummy".to_string(), UpdateMetrics::default());
958 collector.add_manifest(&manifest);
959
960 let props = collector.build();
961 assert_eq!(props.get(ADDED_DATA_FILES).unwrap(), "3");
962 assert_eq!(props.get(DELETED_DATA_FILES).unwrap(), "1");
963 assert_eq!(props.get(ADDED_RECORDS).unwrap(), "100");
964 assert_eq!(props.get(DELETED_RECORDS).unwrap(), "50");
965 }
966
967 #[test]
968 fn test_snapshot_summary_collector_merge() {
969 let schema = Arc::new(
970 Schema::builder()
971 .with_fields(vec![
972 NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
973 NestedField::required(2, "name", Type::Primitive(PrimitiveType::String)).into(),
974 ])
975 .build()
976 .unwrap(),
977 );
978
979 let partition_spec = Arc::new(
980 PartitionSpec::builder(schema.clone())
981 .add_unbound_fields(vec![
982 UnboundPartitionField::builder()
983 .source_id(2)
984 .name("year".to_string())
985 .transform(Transform::Identity)
986 .build(),
987 ])
988 .unwrap()
989 .with_spec_id(1)
990 .build()
991 .unwrap(),
992 );
993
994 let mut summary_one = SnapshotSummaryCollector::default();
995 let mut summary_two = SnapshotSummaryCollector::default();
996
997 summary_one.add_file(
998 &DataFile {
999 content: DataContentType::Data,
1000 file_path: "test.parquet".into(),
1001 file_format: DataFileFormat::Parquet,
1002 partition: Struct::from_iter(vec![]),
1003 record_count: 10,
1004 file_size_in_bytes: 100,
1005 column_sizes: HashMap::new(),
1006 value_counts: HashMap::new(),
1007 null_value_counts: HashMap::new(),
1008 nan_value_counts: HashMap::new(),
1009 lower_bounds: HashMap::new(),
1010 upper_bounds: HashMap::new(),
1011 key_metadata: None,
1012 split_offsets: None,
1013 equality_ids: None,
1014 sort_order_id: None,
1015 partition_spec_id: 0,
1016 first_row_id: None,
1017 referenced_data_file: None,
1018 content_offset: None,
1019 content_size_in_bytes: None,
1020 },
1021 schema.clone(),
1022 partition_spec.clone(),
1023 );
1024
1025 summary_two.add_file(
1026 &DataFile {
1027 content: DataContentType::Data,
1028 file_path: "test.parquet".into(),
1029 file_format: DataFileFormat::Parquet,
1030 partition: Struct::from_iter(vec![]),
1031 record_count: 20,
1032 file_size_in_bytes: 200,
1033 column_sizes: HashMap::new(),
1034 value_counts: HashMap::new(),
1035 null_value_counts: HashMap::new(),
1036 nan_value_counts: HashMap::new(),
1037 lower_bounds: HashMap::new(),
1038 upper_bounds: HashMap::new(),
1039 key_metadata: None,
1040 split_offsets: None,
1041 equality_ids: None,
1042 sort_order_id: None,
1043 partition_spec_id: 0,
1044 first_row_id: None,
1045 referenced_data_file: None,
1046 content_offset: None,
1047 content_size_in_bytes: None,
1048 },
1049 schema.clone(),
1050 partition_spec.clone(),
1051 );
1052
1053 summary_one.merge(summary_two);
1054 let props = summary_one.build();
1055 assert_eq!(props.get(ADDED_DATA_FILES).unwrap(), "2");
1056 assert_eq!(props.get(ADDED_RECORDS).unwrap(), "30");
1057
1058 let mut summary_three = SnapshotSummaryCollector::default();
1059 let mut summary_four = SnapshotSummaryCollector::default();
1060
1061 summary_three.add_manifest(&ManifestFile {
1062 manifest_path: "test.manifest".to_string(),
1063 manifest_length: 0,
1064 partition_spec_id: 0,
1065 content: ManifestContentType::Data,
1066 sequence_number: 0,
1067 min_sequence_number: 0,
1068 added_snapshot_id: 0,
1069 added_files_count: Some(1),
1070 existing_files_count: Some(0),
1071 deleted_files_count: Some(0),
1072 added_rows_count: Some(5),
1073 existing_rows_count: Some(0),
1074 deleted_rows_count: Some(0),
1075 partitions: Some(Vec::new()),
1076 key_metadata: None,
1077 first_row_id: None,
1078 });
1079
1080 summary_four.add_file(
1081 &DataFile {
1082 content: DataContentType::Data,
1083 file_path: "test.parquet".into(),
1084 file_format: DataFileFormat::Parquet,
1085 partition: Struct::from_iter(vec![]),
1086 record_count: 1,
1087 file_size_in_bytes: 10,
1088 column_sizes: HashMap::new(),
1089 value_counts: HashMap::new(),
1090 null_value_counts: HashMap::new(),
1091 nan_value_counts: HashMap::new(),
1092 lower_bounds: HashMap::new(),
1093 upper_bounds: HashMap::new(),
1094 key_metadata: None,
1095 split_offsets: None,
1096 equality_ids: None,
1097 sort_order_id: None,
1098 partition_spec_id: 0,
1099 first_row_id: None,
1100 referenced_data_file: None,
1101 content_offset: None,
1102 content_size_in_bytes: None,
1103 },
1104 schema.clone(),
1105 partition_spec.clone(),
1106 );
1107
1108 summary_three.merge(summary_four);
1109 let props = summary_three.build();
1110
1111 assert_eq!(props.get(ADDED_DATA_FILES).unwrap(), "2");
1112 assert_eq!(props.get(ADDED_RECORDS).unwrap(), "6");
1113 assert!(
1114 props
1115 .iter()
1116 .all(|(k, _)| !k.starts_with(CHANGED_PARTITION_PREFIX))
1117 );
1118 }
1119
1120 #[test]
1121 fn test_update_totals_skipped_when_previous_summary_missing_totals() {
1122 let prev_props: HashMap<String, String> = [(TOTAL_DATA_FILES, "8")]
1123 .into_iter()
1124 .map(|(k, v)| (k.to_string(), v.to_string()))
1125 .collect();
1126
1127 let previous_summary = Summary {
1128 operation: Operation::Overwrite,
1129 additional_properties: prev_props,
1130 };
1131
1132 let new_props: HashMap<String, String> = [
1133 (ADDED_DATA_FILES, "4"),
1134 (ADDED_DELETE_FILES, "2"),
1135 (ADDED_RECORDS, "40"),
1136 (ADDED_FILE_SIZE, "400"),
1137 (ADDED_POSITION_DELETES, "5"),
1138 (ADDED_EQUALITY_DELETES, "3"),
1139 ]
1140 .into_iter()
1141 .map(|(k, v)| (k.to_string(), v.to_string()))
1142 .collect();
1143
1144 let summary = Summary {
1145 operation: Operation::Append,
1146 additional_properties: new_props,
1147 };
1148
1149 let updated = update_snapshot_summaries(summary, Some(&previous_summary), false).unwrap();
1150 let props = &updated.additional_properties;
1151
1152 assert_eq!(props.get(TOTAL_DATA_FILES).unwrap(), "12");
1153
1154 for total_field in [
1155 TOTAL_DELETE_FILES,
1156 TOTAL_RECORDS,
1157 TOTAL_FILE_SIZE,
1158 TOTAL_POSITION_DELETES,
1159 TOTAL_EQUALITY_DELETES,
1160 ] {
1161 assert!(
1162 !props.contains_key(total_field),
1163 "{total_field} should not be set when previous summary lacks it",
1164 );
1165 }
1166 }
1167
1168 #[test]
1169 fn test_update_totals_tolerates_unparsable_added_value() {
1170 let prev_props: HashMap<String, String> = [(TOTAL_DATA_FILES, "8"), (TOTAL_RECORDS, "80")]
1174 .into_iter()
1175 .map(|(k, v)| (k.to_string(), v.to_string()))
1176 .collect();
1177
1178 let previous_summary = Summary {
1179 operation: Operation::Append,
1180 additional_properties: prev_props,
1181 };
1182
1183 let new_props: HashMap<String, String> =
1184 [(ADDED_DATA_FILES, "not-a-number"), (ADDED_RECORDS, "40")]
1185 .into_iter()
1186 .map(|(k, v)| (k.to_string(), v.to_string()))
1187 .collect();
1188
1189 let summary = Summary {
1190 operation: Operation::Append,
1191 additional_properties: new_props,
1192 };
1193
1194 let updated = update_snapshot_summaries(summary, Some(&previous_summary), false).unwrap();
1196 let props = &updated.additional_properties;
1197
1198 assert!(
1200 !props.contains_key(TOTAL_DATA_FILES),
1201 "TOTAL_DATA_FILES should be skipped when its added value is unparsable",
1202 );
1203 assert_eq!(props.get(TOTAL_RECORDS).unwrap(), "120");
1205 }
1206
1207 #[test]
1208 fn test_update_totals_computed_when_no_previous_summary() {
1209 let new_props: HashMap<String, String> = [
1210 (ADDED_DATA_FILES, "4"),
1211 (ADDED_RECORDS, "40"),
1212 (ADDED_FILE_SIZE, "400"),
1213 ]
1214 .into_iter()
1215 .map(|(k, v)| (k.to_string(), v.to_string()))
1216 .collect();
1217
1218 let summary = Summary {
1219 operation: Operation::Append,
1220 additional_properties: new_props,
1221 };
1222
1223 let updated = update_snapshot_summaries(summary, None, false).unwrap();
1224 let props = &updated.additional_properties;
1225
1226 assert_eq!(props.get(TOTAL_DATA_FILES).unwrap(), "4");
1227 assert_eq!(props.get(TOTAL_RECORDS).unwrap(), "40");
1228 assert_eq!(props.get(TOTAL_FILE_SIZE).unwrap(), "400");
1229 }
1230
1231 #[test]
1232 fn test_update_totals_with_removes_only() {
1233 let prev_props: HashMap<String, String> = [
1234 (TOTAL_DATA_FILES, "10"),
1235 (TOTAL_DELETE_FILES, "5"),
1236 (TOTAL_RECORDS, "100"),
1237 (TOTAL_FILE_SIZE, "1000"),
1238 (TOTAL_POSITION_DELETES, "3"),
1239 (TOTAL_EQUALITY_DELETES, "2"),
1240 ]
1241 .into_iter()
1242 .map(|(k, v)| (k.to_string(), v.to_string()))
1243 .collect();
1244
1245 let previous_summary = Summary {
1246 operation: Operation::Overwrite,
1247 additional_properties: prev_props,
1248 };
1249
1250 let new_props: HashMap<String, String> = [
1251 (DELETED_DATA_FILES, "2"),
1252 (REMOVED_DELETE_FILES, "1"),
1253 (DELETED_RECORDS, "20"),
1254 (REMOVED_FILE_SIZE, "200"),
1255 (REMOVED_POSITION_DELETES, "1"),
1256 (REMOVED_EQUALITY_DELETES, "1"),
1257 ]
1258 .into_iter()
1259 .map(|(k, v)| (k.to_string(), v.to_string()))
1260 .collect();
1261
1262 let summary = Summary {
1263 operation: Operation::Delete,
1264 additional_properties: new_props,
1265 };
1266
1267 let updated = update_snapshot_summaries(summary, Some(&previous_summary), false).unwrap();
1268 let props = &updated.additional_properties;
1269
1270 assert_eq!(props.get(TOTAL_DATA_FILES).unwrap(), "8");
1271 assert_eq!(props.get(TOTAL_DELETE_FILES).unwrap(), "4");
1272 assert_eq!(props.get(TOTAL_RECORDS).unwrap(), "80");
1273 assert_eq!(props.get(TOTAL_FILE_SIZE).unwrap(), "800");
1274 assert_eq!(props.get(TOTAL_POSITION_DELETES).unwrap(), "2");
1275 assert_eq!(props.get(TOTAL_EQUALITY_DELETES).unwrap(), "1");
1276 }
1277}