1use std::collections::HashMap;
19
20use itertools::Itertools;
21
22use super::{DataContentType, DataFile, PartitionSpecRef};
23use crate::spec::{ManifestContentType, ManifestFile, Operation, SchemaRef, Summary};
24use crate::{Error, ErrorKind, Result};
25
26const ADDED_DATA_FILES: &str = "added-data-files";
27const ADDED_DELETE_FILES: &str = "added-delete-files";
28const ADDED_EQUALITY_DELETES: &str = "added-equality-deletes";
29const ADDED_FILE_SIZE: &str = "added-files-size";
30const ADDED_POSITION_DELETES: &str = "added-position-deletes";
31const ADDED_POSITION_DELETE_FILES: &str = "added-position-delete-files";
32const ADDED_RECORDS: &str = "added-records";
33const DELETED_DATA_FILES: &str = "deleted-data-files";
34const DELETED_RECORDS: &str = "deleted-records";
35const ADDED_EQUALITY_DELETE_FILES: &str = "added-equality-delete-files";
36const REMOVED_DELETE_FILES: &str = "removed-delete-files";
37const REMOVED_EQUALITY_DELETES: &str = "removed-equality-deletes";
38const REMOVED_EQUALITY_DELETE_FILES: &str = "removed-equality-delete-files";
39const REMOVED_FILE_SIZE: &str = "removed-files-size";
40const REMOVED_POSITION_DELETES: &str = "removed-position-deletes";
41const REMOVED_POSITION_DELETE_FILES: &str = "removed-position-delete-files";
42const TOTAL_EQUALITY_DELETES: &str = "total-equality-deletes";
43const TOTAL_POSITION_DELETES: &str = "total-position-deletes";
44const TOTAL_DATA_FILES: &str = "total-data-files";
45const TOTAL_DELETE_FILES: &str = "total-delete-files";
46const TOTAL_RECORDS: &str = "total-records";
47const TOTAL_FILE_SIZE: &str = "total-files-size";
48const CHANGED_PARTITION_COUNT_PROP: &str = "changed-partition-count";
49const CHANGED_PARTITION_PREFIX: &str = "partitions.";
50
51#[derive(Default)]
55pub struct SnapshotSummaryCollector {
56 metrics: UpdateMetrics,
57 partition_metrics: HashMap<String, UpdateMetrics>,
58 max_changed_partitions_for_summaries: u64,
59 properties: HashMap<String, String>,
60 trust_partition_metrics: bool,
61}
62
63impl SnapshotSummaryCollector {
64 pub fn set(&mut self, key: &str, value: &str) {
66 self.properties.insert(key.to_string(), value.to_string());
67 }
68
69 pub fn set_partition_summary_limit(&mut self, limit: u64) {
72 self.max_changed_partitions_for_summaries = limit;
73 }
74
75 pub fn add_file(
77 &mut self,
78 data_file: &DataFile,
79 schema: SchemaRef,
80 partition_spec: PartitionSpecRef,
81 ) {
82 self.metrics.add_file(data_file);
83 if !data_file.partition.fields().is_empty() {
84 self.update_partition_metrics(schema, partition_spec, data_file, true);
85 }
86 }
87
88 pub fn remove_file(
90 &mut self,
91 data_file: &DataFile,
92 schema: SchemaRef,
93 partition_spec: PartitionSpecRef,
94 ) {
95 self.metrics.remove_file(data_file);
96 if !data_file.partition.fields().is_empty() {
97 self.update_partition_metrics(schema, partition_spec, data_file, false);
98 }
99 }
100
101 pub fn add_manifest(&mut self, manifest: &ManifestFile) {
103 self.trust_partition_metrics = false;
104 self.partition_metrics.clear();
105 self.metrics.add_manifest(manifest);
106 }
107
108 pub fn update_partition_metrics(
110 &mut self,
111 schema: SchemaRef,
112 partition_spec: PartitionSpecRef,
113 data_file: &DataFile,
114 is_add_file: bool,
115 ) {
116 let partition_path = partition_spec.partition_to_path(&data_file.partition, schema);
117 let metrics = self.partition_metrics.entry(partition_path).or_default();
118
119 if is_add_file {
120 metrics.add_file(data_file);
121 } else {
122 metrics.remove_file(data_file);
123 }
124 }
125
126 pub fn merge(&mut self, summary: SnapshotSummaryCollector) {
128 self.metrics.merge(&summary.metrics);
129 self.properties.extend(summary.properties);
130
131 if self.trust_partition_metrics && summary.trust_partition_metrics {
132 for (partition, partition_metric) in summary.partition_metrics.iter() {
133 self.partition_metrics
134 .entry(partition.to_string())
135 .or_default()
136 .merge(partition_metric);
137 }
138 } else {
139 self.partition_metrics.clear();
140 self.trust_partition_metrics = false;
141 }
142 }
143
144 pub fn build(&self) -> HashMap<String, String> {
146 let mut properties = self.metrics.to_map();
147 let changed_partitions_count = self.partition_metrics.len() as u64;
148 set_if_positive(
149 &mut properties,
150 changed_partitions_count,
151 CHANGED_PARTITION_COUNT_PROP,
152 );
153
154 if changed_partitions_count <= self.max_changed_partitions_for_summaries {
155 for (partition_path, update_metrics_partition) in &self.partition_metrics {
156 let property_key = format!("{CHANGED_PARTITION_PREFIX}{partition_path}");
157 let partition_summary = update_metrics_partition
158 .to_map()
159 .into_iter()
160 .map(|(property, value)| format!("{property}={value}"))
161 .join(",");
162
163 if !partition_summary.is_empty() {
164 properties.insert(property_key, partition_summary);
165 }
166 }
167 }
168 properties
169 }
170}
171
172#[derive(Debug, Default)]
173struct UpdateMetrics {
174 added_file_size: u64,
175 removed_file_size: u64,
176 added_data_files: u32,
177 removed_data_files: u32,
178 added_eq_delete_files: u64,
179 removed_eq_delete_files: u64,
180 added_pos_delete_files: u64,
181 removed_pos_delete_files: u64,
182 added_delete_files: u32,
183 removed_delete_files: u32,
184 added_records: u64,
185 deleted_records: u64,
186 added_pos_deletes: u64,
187 removed_pos_deletes: u64,
188 added_eq_deletes: u64,
189 removed_eq_deletes: u64,
190}
191
192impl UpdateMetrics {
193 fn add_file(&mut self, data_file: &DataFile) {
194 self.added_file_size += data_file.file_size_in_bytes;
195 match data_file.content_type() {
196 DataContentType::Data => {
197 self.added_data_files += 1;
198 self.added_records += data_file.record_count;
199 }
200 DataContentType::PositionDeletes => {
201 self.added_delete_files += 1;
202 self.added_pos_delete_files += 1;
203 self.added_pos_deletes += data_file.record_count;
204 }
205 DataContentType::EqualityDeletes => {
206 self.added_delete_files += 1;
207 self.added_eq_delete_files += 1;
208 self.added_eq_deletes += data_file.record_count;
209 }
210 }
211 }
212
213 fn remove_file(&mut self, data_file: &DataFile) {
214 self.removed_file_size += data_file.file_size_in_bytes;
215 match data_file.content_type() {
216 DataContentType::Data => {
217 self.removed_data_files += 1;
218 self.deleted_records += data_file.record_count;
219 }
220 DataContentType::PositionDeletes => {
221 self.removed_delete_files += 1;
222 self.removed_pos_delete_files += 1;
223 self.removed_pos_deletes += data_file.record_count;
224 }
225 DataContentType::EqualityDeletes => {
226 self.removed_delete_files += 1;
227 self.removed_eq_delete_files += 1;
228 self.removed_eq_deletes += data_file.record_count;
229 }
230 }
231 }
232
233 fn add_manifest(&mut self, manifest: &ManifestFile) {
234 match manifest.content {
235 ManifestContentType::Data => {
236 self.added_data_files += manifest.added_files_count.unwrap_or(0);
237 self.added_records += manifest.added_rows_count.unwrap_or(0);
238 self.removed_data_files += manifest.deleted_files_count.unwrap_or(0);
239 self.deleted_records += manifest.deleted_rows_count.unwrap_or(0);
240 }
241 ManifestContentType::Deletes => {
242 self.added_delete_files += manifest.added_files_count.unwrap_or(0);
243 self.removed_delete_files += manifest.deleted_files_count.unwrap_or(0);
244 }
245 }
246 }
247
248 fn to_map(&self) -> HashMap<String, String> {
249 let mut properties = HashMap::new();
250 set_if_positive(&mut properties, self.added_file_size, ADDED_FILE_SIZE);
251 set_if_positive(&mut properties, self.removed_file_size, REMOVED_FILE_SIZE);
252 set_if_positive(&mut properties, self.added_data_files, ADDED_DATA_FILES);
253 set_if_positive(&mut properties, self.removed_data_files, DELETED_DATA_FILES);
254 set_if_positive(
255 &mut properties,
256 self.added_eq_delete_files,
257 ADDED_EQUALITY_DELETE_FILES,
258 );
259 set_if_positive(
260 &mut properties,
261 self.removed_eq_delete_files,
262 REMOVED_EQUALITY_DELETE_FILES,
263 );
264 set_if_positive(
265 &mut properties,
266 self.added_pos_delete_files,
267 ADDED_POSITION_DELETE_FILES,
268 );
269 set_if_positive(
270 &mut properties,
271 self.removed_pos_delete_files,
272 REMOVED_POSITION_DELETE_FILES,
273 );
274 set_if_positive(&mut properties, self.added_delete_files, ADDED_DELETE_FILES);
275 set_if_positive(
276 &mut properties,
277 self.removed_delete_files,
278 REMOVED_DELETE_FILES,
279 );
280 set_if_positive(&mut properties, self.added_records, ADDED_RECORDS);
281 set_if_positive(&mut properties, self.deleted_records, DELETED_RECORDS);
282 set_if_positive(
283 &mut properties,
284 self.added_pos_deletes,
285 ADDED_POSITION_DELETES,
286 );
287 set_if_positive(
288 &mut properties,
289 self.removed_pos_deletes,
290 REMOVED_POSITION_DELETES,
291 );
292 set_if_positive(
293 &mut properties,
294 self.added_eq_deletes,
295 ADDED_EQUALITY_DELETES,
296 );
297 set_if_positive(
298 &mut properties,
299 self.removed_eq_deletes,
300 REMOVED_EQUALITY_DELETES,
301 );
302 properties
303 }
304
305 fn merge(&mut self, other: &UpdateMetrics) {
306 self.added_file_size += other.added_file_size;
307 self.removed_file_size += other.removed_file_size;
308 self.added_data_files += other.added_data_files;
309 self.removed_data_files += other.removed_data_files;
310 self.added_eq_delete_files += other.added_eq_delete_files;
311 self.removed_eq_delete_files += other.removed_eq_delete_files;
312 self.added_pos_delete_files += other.added_pos_delete_files;
313 self.removed_pos_delete_files += other.removed_pos_delete_files;
314 self.added_delete_files += other.added_delete_files;
315 self.removed_delete_files += other.removed_delete_files;
316 self.added_records += other.added_records;
317 self.deleted_records += other.deleted_records;
318 self.added_pos_deletes += other.added_pos_deletes;
319 self.removed_pos_deletes += other.removed_pos_deletes;
320 self.added_eq_deletes += other.added_eq_deletes;
321 self.removed_eq_deletes += other.removed_eq_deletes;
322 }
323}
324
325fn set_if_positive<T>(properties: &mut HashMap<String, String>, value: T, property_name: &str)
326where T: PartialOrd + Default + ToString {
327 if value > T::default() {
328 properties.insert(property_name.to_string(), value.to_string());
329 }
330}
331
332pub(crate) fn update_snapshot_summaries(
333 summary: Summary,
334 previous_summary: Option<&Summary>,
335 truncate_full_table: bool,
336) -> Result<Summary> {
337 if summary.operation != Operation::Append
339 && summary.operation != Operation::Overwrite
340 && summary.operation != Operation::Delete
341 {
342 return Err(Error::new(
343 ErrorKind::DataInvalid,
344 "Operation is not supported.",
345 ));
346 }
347
348 let mut summary = match previous_summary {
349 Some(prev_summary) if truncate_full_table && summary.operation == Operation::Overwrite => {
350 truncate_table_summary(summary, prev_summary).map_err(|err| {
351 Error::new(ErrorKind::Unexpected, "Failed to truncate table summary.")
352 .with_source(err)
353 })?
354 }
355 _ => summary,
356 };
357
358 update_totals(
359 &mut summary,
360 previous_summary,
361 TOTAL_DATA_FILES,
362 ADDED_DATA_FILES,
363 DELETED_DATA_FILES,
364 );
365
366 update_totals(
367 &mut summary,
368 previous_summary,
369 TOTAL_DELETE_FILES,
370 ADDED_DELETE_FILES,
371 REMOVED_DELETE_FILES,
372 );
373
374 update_totals(
375 &mut summary,
376 previous_summary,
377 TOTAL_RECORDS,
378 ADDED_RECORDS,
379 DELETED_RECORDS,
380 );
381
382 update_totals(
383 &mut summary,
384 previous_summary,
385 TOTAL_FILE_SIZE,
386 ADDED_FILE_SIZE,
387 REMOVED_FILE_SIZE,
388 );
389
390 update_totals(
391 &mut summary,
392 previous_summary,
393 TOTAL_POSITION_DELETES,
394 ADDED_POSITION_DELETES,
395 REMOVED_POSITION_DELETES,
396 );
397
398 update_totals(
399 &mut summary,
400 previous_summary,
401 TOTAL_EQUALITY_DELETES,
402 ADDED_EQUALITY_DELETES,
403 REMOVED_EQUALITY_DELETES,
404 );
405 Ok(summary)
406}
407
408fn get_prop(previous_summary: &Summary, prop: &str) -> Result<u64> {
409 let value_str = previous_summary
410 .additional_properties
411 .get(prop)
412 .map(String::as_str)
413 .unwrap_or("0");
414 value_str.parse::<u64>().map_err(|err| {
415 Error::new(
416 ErrorKind::Unexpected,
417 format!("Failed to parse summary property '{prop}' value '{value_str}' as u64."),
418 )
419 .with_source(err)
420 })
421}
422
423fn truncate_table_summary(mut summary: Summary, previous_summary: &Summary) -> Result<Summary> {
424 for prop in [
425 TOTAL_DATA_FILES,
426 TOTAL_DELETE_FILES,
427 TOTAL_RECORDS,
428 TOTAL_FILE_SIZE,
429 TOTAL_POSITION_DELETES,
430 TOTAL_EQUALITY_DELETES,
431 ] {
432 summary
433 .additional_properties
434 .insert(prop.to_string(), "0".to_string());
435 }
436
437 let value = get_prop(previous_summary, TOTAL_DATA_FILES)?;
438 if value != 0 {
439 summary
440 .additional_properties
441 .insert(DELETED_DATA_FILES.to_string(), value.to_string());
442 }
443 let value = get_prop(previous_summary, TOTAL_DELETE_FILES)?;
444 if value != 0 {
445 summary
446 .additional_properties
447 .insert(REMOVED_DELETE_FILES.to_string(), value.to_string());
448 }
449 let value = get_prop(previous_summary, TOTAL_RECORDS)?;
450 if value != 0 {
451 summary
452 .additional_properties
453 .insert(DELETED_RECORDS.to_string(), value.to_string());
454 }
455 let value = get_prop(previous_summary, TOTAL_FILE_SIZE)?;
456 if value != 0 {
457 summary
458 .additional_properties
459 .insert(REMOVED_FILE_SIZE.to_string(), value.to_string());
460 }
461
462 let value = get_prop(previous_summary, TOTAL_POSITION_DELETES)?;
463 if value != 0 {
464 summary
465 .additional_properties
466 .insert(REMOVED_POSITION_DELETES.to_string(), value.to_string());
467 }
468
469 let value = get_prop(previous_summary, TOTAL_EQUALITY_DELETES)?;
470 if value != 0 {
471 summary
472 .additional_properties
473 .insert(REMOVED_EQUALITY_DELETES.to_string(), value.to_string());
474 }
475
476 Ok(summary)
477}
478
479fn update_totals(
480 summary: &mut Summary,
481 previous_summary: Option<&Summary>,
482 total_property: &str,
483 added_property: &str,
484 removed_property: &str,
485) {
486 let previous_total = match previous_summary {
487 None => 0,
488 Some(prev_summary) => match prev_summary.additional_properties.get(total_property) {
489 Some(value_str) => match value_str.parse::<u64>() {
490 Ok(v) => v,
491 Err(parse_err) => {
492 tracing::warn!(
493 "Property '{total_property}' could not be parsed in the previous snapshot summary: {parse_err}. \
494 Skipping total computation.",
495 );
496 return;
497 }
498 },
499 None => {
500 tracing::debug!(
501 "Property '{total_property}' was not set in the previous snapshot summary. \
502 Skipping total computation."
503 );
504 return;
505 }
506 },
507 };
508
509 let added = summary
510 .additional_properties
511 .get(added_property)
512 .map_or(0, |value| {
513 value
514 .parse::<u64>()
515 .expect("must be parsable as it was just serialized")
516 });
517 let removed = summary
518 .additional_properties
519 .get(removed_property)
520 .map_or(0, |value| {
521 value
522 .parse::<u64>()
523 .expect("must be parsable as it was just serialized")
524 });
525
526 let new_total = previous_total + added - removed;
527 summary
528 .additional_properties
529 .insert(total_property.to_string(), new_total.to_string());
530}
531
532#[cfg(test)]
533mod tests {
534 use std::collections::HashMap;
535 use std::sync::Arc;
536
537 use super::*;
538 use crate::spec::{
539 DataFileFormat, Datum, Literal, NestedField, PartitionSpec, PrimitiveType, Schema, Struct,
540 Transform, Type, UnboundPartitionField,
541 };
542
543 #[test]
544 fn test_update_snapshot_summaries_append() {
545 let prev_props: HashMap<String, String> = [
546 (TOTAL_DATA_FILES.to_string(), "10".to_string()),
547 (TOTAL_DELETE_FILES.to_string(), "5".to_string()),
548 (TOTAL_RECORDS.to_string(), "100".to_string()),
549 (TOTAL_FILE_SIZE.to_string(), "1000".to_string()),
550 (TOTAL_POSITION_DELETES.to_string(), "3".to_string()),
551 (TOTAL_EQUALITY_DELETES.to_string(), "2".to_string()),
552 ]
553 .into_iter()
554 .collect();
555
556 let previous_summary = Summary {
557 operation: Operation::Append,
558 additional_properties: prev_props,
559 };
560
561 let new_props: HashMap<String, String> = [
562 (ADDED_DATA_FILES.to_string(), "4".to_string()),
563 (DELETED_DATA_FILES.to_string(), "1".to_string()),
564 (ADDED_DELETE_FILES.to_string(), "2".to_string()),
565 (REMOVED_DELETE_FILES.to_string(), "1".to_string()),
566 (ADDED_RECORDS.to_string(), "40".to_string()),
567 (DELETED_RECORDS.to_string(), "10".to_string()),
568 (ADDED_FILE_SIZE.to_string(), "400".to_string()),
569 (REMOVED_FILE_SIZE.to_string(), "100".to_string()),
570 (ADDED_POSITION_DELETES.to_string(), "5".to_string()),
571 (REMOVED_POSITION_DELETES.to_string(), "2".to_string()),
572 (ADDED_EQUALITY_DELETES.to_string(), "3".to_string()),
573 (REMOVED_EQUALITY_DELETES.to_string(), "1".to_string()),
574 ]
575 .into_iter()
576 .collect();
577
578 let summary = Summary {
579 operation: Operation::Append,
580 additional_properties: new_props,
581 };
582
583 let updated = update_snapshot_summaries(summary, Some(&previous_summary), false).unwrap();
584
585 assert_eq!(
586 updated.additional_properties.get(TOTAL_DATA_FILES).unwrap(),
587 "13"
588 );
589 assert_eq!(
590 updated
591 .additional_properties
592 .get(TOTAL_DELETE_FILES)
593 .unwrap(),
594 "6"
595 );
596 assert_eq!(
597 updated.additional_properties.get(TOTAL_RECORDS).unwrap(),
598 "130"
599 );
600 assert_eq!(
601 updated.additional_properties.get(TOTAL_FILE_SIZE).unwrap(),
602 "1300"
603 );
604 assert_eq!(
605 updated
606 .additional_properties
607 .get(TOTAL_POSITION_DELETES)
608 .unwrap(),
609 "6"
610 );
611 assert_eq!(
612 updated
613 .additional_properties
614 .get(TOTAL_EQUALITY_DELETES)
615 .unwrap(),
616 "4"
617 );
618 }
619
620 #[test]
621 fn test_truncate_table_summary() {
622 let prev_props: HashMap<String, String> = [
623 (TOTAL_DATA_FILES.to_string(), "10".to_string()),
624 (TOTAL_DELETE_FILES.to_string(), "5".to_string()),
625 (TOTAL_RECORDS.to_string(), "100".to_string()),
626 (TOTAL_FILE_SIZE.to_string(), "1000".to_string()),
627 (TOTAL_POSITION_DELETES.to_string(), "3".to_string()),
628 (TOTAL_EQUALITY_DELETES.to_string(), "2".to_string()),
629 ]
630 .into_iter()
631 .collect();
632
633 let previous_summary = Summary {
634 operation: Operation::Overwrite,
635 additional_properties: prev_props,
636 };
637
638 let mut new_props = HashMap::new();
639 new_props.insert("dummy".to_string(), "value".to_string());
640 let summary = Summary {
641 operation: Operation::Overwrite,
642 additional_properties: new_props,
643 };
644
645 let truncated = truncate_table_summary(summary, &previous_summary).unwrap();
646
647 assert_eq!(
648 truncated
649 .additional_properties
650 .get(TOTAL_DATA_FILES)
651 .unwrap(),
652 "0"
653 );
654 assert_eq!(
655 truncated
656 .additional_properties
657 .get(TOTAL_DELETE_FILES)
658 .unwrap(),
659 "0"
660 );
661 assert_eq!(
662 truncated.additional_properties.get(TOTAL_RECORDS).unwrap(),
663 "0"
664 );
665 assert_eq!(
666 truncated
667 .additional_properties
668 .get(TOTAL_FILE_SIZE)
669 .unwrap(),
670 "0"
671 );
672 assert_eq!(
673 truncated
674 .additional_properties
675 .get(TOTAL_POSITION_DELETES)
676 .unwrap(),
677 "0"
678 );
679 assert_eq!(
680 truncated
681 .additional_properties
682 .get(TOTAL_EQUALITY_DELETES)
683 .unwrap(),
684 "0"
685 );
686
687 assert_eq!(
688 truncated
689 .additional_properties
690 .get(DELETED_DATA_FILES)
691 .unwrap(),
692 "10"
693 );
694 assert_eq!(
695 truncated
696 .additional_properties
697 .get(REMOVED_DELETE_FILES)
698 .unwrap(),
699 "5"
700 );
701 assert_eq!(
702 truncated
703 .additional_properties
704 .get(DELETED_RECORDS)
705 .unwrap(),
706 "100"
707 );
708 assert_eq!(
709 truncated
710 .additional_properties
711 .get(REMOVED_FILE_SIZE)
712 .unwrap(),
713 "1000"
714 );
715 assert_eq!(
716 truncated
717 .additional_properties
718 .get(REMOVED_POSITION_DELETES)
719 .unwrap(),
720 "3"
721 );
722 assert_eq!(
723 truncated
724 .additional_properties
725 .get(REMOVED_EQUALITY_DELETES)
726 .unwrap(),
727 "2"
728 );
729 }
730
731 #[test]
732 fn test_update_snapshot_summaries_overwrite_truncate_handles_totals_above_i32_max() {
733 let big = (i32::MAX as u64 + 1).to_string(); let prev_props: HashMap<String, String> = [
738 (TOTAL_DATA_FILES.to_string(), big.clone()),
739 (TOTAL_DELETE_FILES.to_string(), "0".to_string()),
740 (TOTAL_RECORDS.to_string(), big.clone()),
741 (TOTAL_FILE_SIZE.to_string(), "0".to_string()),
742 (TOTAL_POSITION_DELETES.to_string(), "0".to_string()),
743 (TOTAL_EQUALITY_DELETES.to_string(), "0".to_string()),
744 ]
745 .into_iter()
746 .collect();
747
748 let previous_summary = Summary {
749 operation: Operation::Overwrite,
750 additional_properties: prev_props,
751 };
752
753 let summary = Summary {
754 operation: Operation::Overwrite,
755 additional_properties: HashMap::new(),
756 };
757
758 let updated = update_snapshot_summaries(summary, Some(&previous_summary), true)
759 .expect("overwrite truncation should accept totals above i32::MAX");
760 assert_eq!(
761 updated
762 .additional_properties
763 .get(DELETED_DATA_FILES)
764 .unwrap(),
765 &big
766 );
767 assert_eq!(
768 updated.additional_properties.get(DELETED_RECORDS).unwrap(),
769 &big
770 );
771 }
772
773 #[test]
774 fn test_update_snapshot_summaries_overwrite_truncate_returns_err_on_malformed_total() {
775 let prev_props: HashMap<String, String> = [
779 (TOTAL_DATA_FILES.to_string(), "not_a_number".to_string()),
780 (TOTAL_DELETE_FILES.to_string(), "0".to_string()),
781 (TOTAL_RECORDS.to_string(), "0".to_string()),
782 (TOTAL_FILE_SIZE.to_string(), "0".to_string()),
783 (TOTAL_POSITION_DELETES.to_string(), "0".to_string()),
784 (TOTAL_EQUALITY_DELETES.to_string(), "0".to_string()),
785 ]
786 .into_iter()
787 .collect();
788
789 let previous_summary = Summary {
790 operation: Operation::Overwrite,
791 additional_properties: prev_props,
792 };
793
794 let summary = Summary {
795 operation: Operation::Overwrite,
796 additional_properties: HashMap::new(),
797 };
798
799 let err = update_snapshot_summaries(summary, Some(&previous_summary), true)
800 .expect_err("malformed previous summary must produce an Err, not a panic");
801 assert!(
802 err.message().contains("truncate table summary"),
803 "expected wrapped 'Failed to truncate table summary' context, got: {}",
804 err.message()
805 );
806 }
807
808 #[test]
809 fn test_snapshot_summary_collector_build() {
810 let schema = Arc::new(
811 Schema::builder()
812 .with_fields(vec![
813 NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
814 NestedField::required(2, "name", Type::Primitive(PrimitiveType::String)).into(),
815 ])
816 .build()
817 .unwrap(),
818 );
819
820 let partition_spec = Arc::new(
821 PartitionSpec::builder(schema.clone())
822 .add_unbound_fields(vec![
823 UnboundPartitionField::builder()
824 .source_id(2)
825 .name("year".to_string())
826 .transform(Transform::Identity)
827 .build(),
828 ])
829 .unwrap()
830 .with_spec_id(1)
831 .build()
832 .unwrap(),
833 );
834
835 let mut collector = SnapshotSummaryCollector::default();
836 collector.set_partition_summary_limit(10);
837
838 let file1 = DataFile {
839 content: DataContentType::Data,
840 file_path: "s3://testbucket/path/to/file1.parquet".to_string(),
841 file_format: DataFileFormat::Parquet,
842 partition: Struct::from_iter(vec![]),
843 record_count: 10,
844 file_size_in_bytes: 100,
845 column_sizes: HashMap::from([(1, 46), (2, 48), (3, 48)]),
846 value_counts: HashMap::from([(1, 10), (2, 10), (3, 10)]),
847 null_value_counts: HashMap::from([(1, 0), (2, 0), (3, 0)]),
848 nan_value_counts: HashMap::new(),
849 lower_bounds: HashMap::from([
850 (1, Datum::long(1)),
851 (2, Datum::string("a")),
852 (3, Datum::string("x")),
853 ]),
854 upper_bounds: HashMap::from([
855 (1, Datum::long(1)),
856 (2, Datum::string("a")),
857 (3, Datum::string("x")),
858 ]),
859 key_metadata: None,
860 split_offsets: Some(vec![4]),
861 equality_ids: None,
862 sort_order_id: Some(0),
863 partition_spec_id: 0,
864 first_row_id: None,
865 referenced_data_file: None,
866 content_offset: None,
867 content_size_in_bytes: None,
868 };
869
870 let file2 = DataFile {
871 content: DataContentType::Data,
872 file_path: "s3://testbucket/path/to/file2.parquet".to_string(),
873 file_format: DataFileFormat::Parquet,
874 partition: Struct::from_iter(vec![Some(Literal::string("2025"))]),
875 record_count: 20,
876 file_size_in_bytes: 200,
877 column_sizes: HashMap::from([(1, 46), (2, 48), (3, 48)]),
878 value_counts: HashMap::from([(1, 20), (2, 20), (3, 20)]),
879 null_value_counts: HashMap::from([(1, 0), (2, 0), (3, 0)]),
880 nan_value_counts: HashMap::new(),
881 lower_bounds: HashMap::from([
882 (1, Datum::long(1)),
883 (2, Datum::string("a")),
884 (3, Datum::string("x")),
885 ]),
886 upper_bounds: HashMap::from([
887 (1, Datum::long(1)),
888 (2, Datum::string("a")),
889 (3, Datum::string("x")),
890 ]),
891 key_metadata: None,
892 split_offsets: Some(vec![4]),
893 equality_ids: None,
894 sort_order_id: Some(0),
895 partition_spec_id: 0,
896 first_row_id: None,
897 referenced_data_file: None,
898 content_offset: None,
899 content_size_in_bytes: None,
900 };
901
902 collector.add_file(&file1, schema.clone(), partition_spec.clone());
903 collector.add_file(&file2, schema.clone(), partition_spec.clone());
904
905 collector.remove_file(&file1, schema.clone(), partition_spec.clone());
906
907 let props = collector.build();
908
909 assert_eq!(props.get(ADDED_FILE_SIZE).unwrap(), "300");
910 assert_eq!(props.get(REMOVED_FILE_SIZE).unwrap(), "100");
911
912 let partition_key = format!("{}{}", CHANGED_PARTITION_PREFIX, "year=2025");
913
914 assert!(props.contains_key(&partition_key));
915
916 let partition_summary = props.get(&partition_key).unwrap();
917 assert!(partition_summary.contains(&format!("{ADDED_FILE_SIZE}=200")));
918 assert!(partition_summary.contains(&format!("{ADDED_DATA_FILES}=1")));
919 assert!(partition_summary.contains(&format!("{ADDED_RECORDS}=20")));
920 }
921
922 #[test]
923 fn test_snapshot_summary_collector_add_manifest() {
924 let mut collector = SnapshotSummaryCollector::default();
925 collector.set_partition_summary_limit(10);
926
927 let manifest = ManifestFile {
928 manifest_path: "file://dummy.manifest".to_string(),
929 manifest_length: 0,
930 partition_spec_id: 0,
931 content: ManifestContentType::Data,
932 sequence_number: 0,
933 min_sequence_number: 0,
934 added_snapshot_id: 0,
935 added_files_count: Some(3),
936 existing_files_count: Some(0),
937 deleted_files_count: Some(1),
938 added_rows_count: Some(100),
939 existing_rows_count: Some(0),
940 deleted_rows_count: Some(50),
941 partitions: Some(Vec::new()),
942 key_metadata: None,
943 first_row_id: None,
944 };
945
946 collector
947 .partition_metrics
948 .insert("dummy".to_string(), UpdateMetrics::default());
949 collector.add_manifest(&manifest);
950
951 let props = collector.build();
952 assert_eq!(props.get(ADDED_DATA_FILES).unwrap(), "3");
953 assert_eq!(props.get(DELETED_DATA_FILES).unwrap(), "1");
954 assert_eq!(props.get(ADDED_RECORDS).unwrap(), "100");
955 assert_eq!(props.get(DELETED_RECORDS).unwrap(), "50");
956 }
957
958 #[test]
959 fn test_snapshot_summary_collector_merge() {
960 let schema = Arc::new(
961 Schema::builder()
962 .with_fields(vec![
963 NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
964 NestedField::required(2, "name", Type::Primitive(PrimitiveType::String)).into(),
965 ])
966 .build()
967 .unwrap(),
968 );
969
970 let partition_spec = Arc::new(
971 PartitionSpec::builder(schema.clone())
972 .add_unbound_fields(vec![
973 UnboundPartitionField::builder()
974 .source_id(2)
975 .name("year".to_string())
976 .transform(Transform::Identity)
977 .build(),
978 ])
979 .unwrap()
980 .with_spec_id(1)
981 .build()
982 .unwrap(),
983 );
984
985 let mut summary_one = SnapshotSummaryCollector::default();
986 let mut summary_two = SnapshotSummaryCollector::default();
987
988 summary_one.add_file(
989 &DataFile {
990 content: DataContentType::Data,
991 file_path: "test.parquet".into(),
992 file_format: DataFileFormat::Parquet,
993 partition: Struct::from_iter(vec![]),
994 record_count: 10,
995 file_size_in_bytes: 100,
996 column_sizes: HashMap::new(),
997 value_counts: HashMap::new(),
998 null_value_counts: HashMap::new(),
999 nan_value_counts: HashMap::new(),
1000 lower_bounds: HashMap::new(),
1001 upper_bounds: HashMap::new(),
1002 key_metadata: None,
1003 split_offsets: None,
1004 equality_ids: None,
1005 sort_order_id: None,
1006 partition_spec_id: 0,
1007 first_row_id: None,
1008 referenced_data_file: None,
1009 content_offset: None,
1010 content_size_in_bytes: None,
1011 },
1012 schema.clone(),
1013 partition_spec.clone(),
1014 );
1015
1016 summary_two.add_file(
1017 &DataFile {
1018 content: DataContentType::Data,
1019 file_path: "test.parquet".into(),
1020 file_format: DataFileFormat::Parquet,
1021 partition: Struct::from_iter(vec![]),
1022 record_count: 20,
1023 file_size_in_bytes: 200,
1024 column_sizes: HashMap::new(),
1025 value_counts: HashMap::new(),
1026 null_value_counts: HashMap::new(),
1027 nan_value_counts: HashMap::new(),
1028 lower_bounds: HashMap::new(),
1029 upper_bounds: HashMap::new(),
1030 key_metadata: None,
1031 split_offsets: None,
1032 equality_ids: None,
1033 sort_order_id: None,
1034 partition_spec_id: 0,
1035 first_row_id: None,
1036 referenced_data_file: None,
1037 content_offset: None,
1038 content_size_in_bytes: None,
1039 },
1040 schema.clone(),
1041 partition_spec.clone(),
1042 );
1043
1044 summary_one.merge(summary_two);
1045 let props = summary_one.build();
1046 assert_eq!(props.get(ADDED_DATA_FILES).unwrap(), "2");
1047 assert_eq!(props.get(ADDED_RECORDS).unwrap(), "30");
1048
1049 let mut summary_three = SnapshotSummaryCollector::default();
1050 let mut summary_four = SnapshotSummaryCollector::default();
1051
1052 summary_three.add_manifest(&ManifestFile {
1053 manifest_path: "test.manifest".to_string(),
1054 manifest_length: 0,
1055 partition_spec_id: 0,
1056 content: ManifestContentType::Data,
1057 sequence_number: 0,
1058 min_sequence_number: 0,
1059 added_snapshot_id: 0,
1060 added_files_count: Some(1),
1061 existing_files_count: Some(0),
1062 deleted_files_count: Some(0),
1063 added_rows_count: Some(5),
1064 existing_rows_count: Some(0),
1065 deleted_rows_count: Some(0),
1066 partitions: Some(Vec::new()),
1067 key_metadata: None,
1068 first_row_id: None,
1069 });
1070
1071 summary_four.add_file(
1072 &DataFile {
1073 content: DataContentType::Data,
1074 file_path: "test.parquet".into(),
1075 file_format: DataFileFormat::Parquet,
1076 partition: Struct::from_iter(vec![]),
1077 record_count: 1,
1078 file_size_in_bytes: 10,
1079 column_sizes: HashMap::new(),
1080 value_counts: HashMap::new(),
1081 null_value_counts: HashMap::new(),
1082 nan_value_counts: HashMap::new(),
1083 lower_bounds: HashMap::new(),
1084 upper_bounds: HashMap::new(),
1085 key_metadata: None,
1086 split_offsets: None,
1087 equality_ids: None,
1088 sort_order_id: None,
1089 partition_spec_id: 0,
1090 first_row_id: None,
1091 referenced_data_file: None,
1092 content_offset: None,
1093 content_size_in_bytes: None,
1094 },
1095 schema.clone(),
1096 partition_spec.clone(),
1097 );
1098
1099 summary_three.merge(summary_four);
1100 let props = summary_three.build();
1101
1102 assert_eq!(props.get(ADDED_DATA_FILES).unwrap(), "2");
1103 assert_eq!(props.get(ADDED_RECORDS).unwrap(), "6");
1104 assert!(
1105 props
1106 .iter()
1107 .all(|(k, _)| !k.starts_with(CHANGED_PARTITION_PREFIX))
1108 );
1109 }
1110
1111 #[test]
1112 fn test_update_totals_skipped_when_previous_summary_missing_totals() {
1113 let prev_props: HashMap<String, String> = [(TOTAL_DATA_FILES, "8")]
1114 .into_iter()
1115 .map(|(k, v)| (k.to_string(), v.to_string()))
1116 .collect();
1117
1118 let previous_summary = Summary {
1119 operation: Operation::Overwrite,
1120 additional_properties: prev_props,
1121 };
1122
1123 let new_props: HashMap<String, String> = [
1124 (ADDED_DATA_FILES, "4"),
1125 (ADDED_DELETE_FILES, "2"),
1126 (ADDED_RECORDS, "40"),
1127 (ADDED_FILE_SIZE, "400"),
1128 (ADDED_POSITION_DELETES, "5"),
1129 (ADDED_EQUALITY_DELETES, "3"),
1130 ]
1131 .into_iter()
1132 .map(|(k, v)| (k.to_string(), v.to_string()))
1133 .collect();
1134
1135 let summary = Summary {
1136 operation: Operation::Append,
1137 additional_properties: new_props,
1138 };
1139
1140 let updated = update_snapshot_summaries(summary, Some(&previous_summary), false).unwrap();
1141 let props = &updated.additional_properties;
1142
1143 assert_eq!(props.get(TOTAL_DATA_FILES).unwrap(), "12");
1144
1145 for total_field in [
1146 TOTAL_DELETE_FILES,
1147 TOTAL_RECORDS,
1148 TOTAL_FILE_SIZE,
1149 TOTAL_POSITION_DELETES,
1150 TOTAL_EQUALITY_DELETES,
1151 ] {
1152 assert!(
1153 !props.contains_key(total_field),
1154 "{total_field} should not be set when previous summary lacks it",
1155 );
1156 }
1157 }
1158
1159 #[test]
1160 fn test_update_totals_computed_when_no_previous_summary() {
1161 let new_props: HashMap<String, String> = [
1162 (ADDED_DATA_FILES, "4"),
1163 (ADDED_RECORDS, "40"),
1164 (ADDED_FILE_SIZE, "400"),
1165 ]
1166 .into_iter()
1167 .map(|(k, v)| (k.to_string(), v.to_string()))
1168 .collect();
1169
1170 let summary = Summary {
1171 operation: Operation::Append,
1172 additional_properties: new_props,
1173 };
1174
1175 let updated = update_snapshot_summaries(summary, None, false).unwrap();
1176 let props = &updated.additional_properties;
1177
1178 assert_eq!(props.get(TOTAL_DATA_FILES).unwrap(), "4");
1179 assert_eq!(props.get(TOTAL_RECORDS).unwrap(), "40");
1180 assert_eq!(props.get(TOTAL_FILE_SIZE).unwrap(), "400");
1181 }
1182
1183 #[test]
1184 fn test_update_totals_with_removes_only() {
1185 let prev_props: HashMap<String, String> = [
1186 (TOTAL_DATA_FILES, "10"),
1187 (TOTAL_DELETE_FILES, "5"),
1188 (TOTAL_RECORDS, "100"),
1189 (TOTAL_FILE_SIZE, "1000"),
1190 (TOTAL_POSITION_DELETES, "3"),
1191 (TOTAL_EQUALITY_DELETES, "2"),
1192 ]
1193 .into_iter()
1194 .map(|(k, v)| (k.to_string(), v.to_string()))
1195 .collect();
1196
1197 let previous_summary = Summary {
1198 operation: Operation::Overwrite,
1199 additional_properties: prev_props,
1200 };
1201
1202 let new_props: HashMap<String, String> = [
1203 (DELETED_DATA_FILES, "2"),
1204 (REMOVED_DELETE_FILES, "1"),
1205 (DELETED_RECORDS, "20"),
1206 (REMOVED_FILE_SIZE, "200"),
1207 (REMOVED_POSITION_DELETES, "1"),
1208 (REMOVED_EQUALITY_DELETES, "1"),
1209 ]
1210 .into_iter()
1211 .map(|(k, v)| (k.to_string(), v.to_string()))
1212 .collect();
1213
1214 let summary = Summary {
1215 operation: Operation::Delete,
1216 additional_properties: new_props,
1217 };
1218
1219 let updated = update_snapshot_summaries(summary, Some(&previous_summary), false).unwrap();
1220 let props = &updated.additional_properties;
1221
1222 assert_eq!(props.get(TOTAL_DATA_FILES).unwrap(), "8");
1223 assert_eq!(props.get(TOTAL_DELETE_FILES).unwrap(), "4");
1224 assert_eq!(props.get(TOTAL_RECORDS).unwrap(), "80");
1225 assert_eq!(props.get(TOTAL_FILE_SIZE).unwrap(), "800");
1226 assert_eq!(props.get(TOTAL_POSITION_DELETES).unwrap(), "2");
1227 assert_eq!(props.get(TOTAL_EQUALITY_DELETES).unwrap(), "1");
1228 }
1229}