1use std::collections::HashMap;
19
20use itertools::Itertools;
21
22use super::{DataContentType, DataFile, PartitionSpecRef};
23use crate::spec::{ManifestContentType, ManifestFile, Operation, SchemaRef, Summary};
24use crate::{Error, ErrorKind, Result};
25
26const ADDED_DATA_FILES: &str = "added-data-files";
27const ADDED_DELETE_FILES: &str = "added-delete-files";
28const ADDED_EQUALITY_DELETES: &str = "added-equality-deletes";
29const ADDED_FILE_SIZE: &str = "added-files-size";
30const ADDED_POSITION_DELETES: &str = "added-position-deletes";
31const ADDED_POSITION_DELETE_FILES: &str = "added-position-delete-files";
32const ADDED_RECORDS: &str = "added-records";
33const DELETED_DATA_FILES: &str = "deleted-data-files";
34const DELETED_RECORDS: &str = "deleted-records";
35const ADDED_EQUALITY_DELETE_FILES: &str = "added-equality-delete-files";
36const REMOVED_DELETE_FILES: &str = "removed-delete-files";
37const REMOVED_EQUALITY_DELETES: &str = "removed-equality-deletes";
38const REMOVED_EQUALITY_DELETE_FILES: &str = "removed-equality-delete-files";
39const REMOVED_FILE_SIZE: &str = "removed-files-size";
40const REMOVED_POSITION_DELETES: &str = "removed-position-deletes";
41const REMOVED_POSITION_DELETE_FILES: &str = "removed-position-delete-files";
42const TOTAL_EQUALITY_DELETES: &str = "total-equality-deletes";
43const TOTAL_POSITION_DELETES: &str = "total-position-deletes";
44const TOTAL_DATA_FILES: &str = "total-data-files";
45const TOTAL_DELETE_FILES: &str = "total-delete-files";
46const TOTAL_RECORDS: &str = "total-records";
47const TOTAL_FILE_SIZE: &str = "total-files-size";
48const CHANGED_PARTITION_COUNT_PROP: &str = "changed-partition-count";
49const CHANGED_PARTITION_PREFIX: &str = "partitions.";
50
51#[derive(Default)]
55pub struct SnapshotSummaryCollector {
56 metrics: UpdateMetrics,
57 partition_metrics: HashMap<String, UpdateMetrics>,
58 max_changed_partitions_for_summaries: u64,
59 properties: HashMap<String, String>,
60 trust_partition_metrics: bool,
61}
62
63impl SnapshotSummaryCollector {
64 pub fn set(&mut self, key: &str, value: &str) {
66 self.properties.insert(key.to_string(), value.to_string());
67 }
68
69 pub fn set_partition_summary_limit(&mut self, limit: u64) {
72 self.max_changed_partitions_for_summaries = limit;
73 }
74
75 pub fn add_file(
77 &mut self,
78 data_file: &DataFile,
79 schema: SchemaRef,
80 partition_spec: PartitionSpecRef,
81 ) {
82 self.metrics.add_file(data_file);
83 if !data_file.partition.fields().is_empty() {
84 self.update_partition_metrics(schema, partition_spec, data_file, true);
85 }
86 }
87
88 pub fn remove_file(
90 &mut self,
91 data_file: &DataFile,
92 schema: SchemaRef,
93 partition_spec: PartitionSpecRef,
94 ) {
95 self.metrics.remove_file(data_file);
96 if !data_file.partition.fields().is_empty() {
97 self.update_partition_metrics(schema, partition_spec, data_file, false);
98 }
99 }
100
101 pub fn add_manifest(&mut self, manifest: &ManifestFile) {
103 self.trust_partition_metrics = false;
104 self.partition_metrics.clear();
105 self.metrics.add_manifest(manifest);
106 }
107
108 pub fn update_partition_metrics(
110 &mut self,
111 schema: SchemaRef,
112 partition_spec: PartitionSpecRef,
113 data_file: &DataFile,
114 is_add_file: bool,
115 ) {
116 let partition_path = partition_spec.partition_to_path(&data_file.partition, schema);
117 let metrics = self.partition_metrics.entry(partition_path).or_default();
118
119 if is_add_file {
120 metrics.add_file(data_file);
121 } else {
122 metrics.remove_file(data_file);
123 }
124 }
125
126 pub fn merge(&mut self, summary: SnapshotSummaryCollector) {
128 self.metrics.merge(&summary.metrics);
129 self.properties.extend(summary.properties);
130
131 if self.trust_partition_metrics && summary.trust_partition_metrics {
132 for (partition, partition_metric) in summary.partition_metrics.iter() {
133 self.partition_metrics
134 .entry(partition.to_string())
135 .or_default()
136 .merge(partition_metric);
137 }
138 } else {
139 self.partition_metrics.clear();
140 self.trust_partition_metrics = false;
141 }
142 }
143
144 pub fn build(&self) -> HashMap<String, String> {
146 let mut properties = self.metrics.to_map();
147 let changed_partitions_count = self.partition_metrics.len() as u64;
148 set_if_positive(
149 &mut properties,
150 changed_partitions_count,
151 CHANGED_PARTITION_COUNT_PROP,
152 );
153
154 if changed_partitions_count <= self.max_changed_partitions_for_summaries {
155 for (partition_path, update_metrics_partition) in &self.partition_metrics {
156 let property_key = format!("{CHANGED_PARTITION_PREFIX}{partition_path}");
157 let partition_summary = update_metrics_partition
158 .to_map()
159 .into_iter()
160 .map(|(property, value)| format!("{property}={value}"))
161 .join(",");
162
163 if !partition_summary.is_empty() {
164 properties.insert(property_key, partition_summary);
165 }
166 }
167 }
168 properties
169 }
170}
171
172#[derive(Debug, Default)]
173struct UpdateMetrics {
174 added_file_size: u64,
175 removed_file_size: u64,
176 added_data_files: u32,
177 removed_data_files: u32,
178 added_eq_delete_files: u64,
179 removed_eq_delete_files: u64,
180 added_pos_delete_files: u64,
181 removed_pos_delete_files: u64,
182 added_delete_files: u32,
183 removed_delete_files: u32,
184 added_records: u64,
185 deleted_records: u64,
186 added_pos_deletes: u64,
187 removed_pos_deletes: u64,
188 added_eq_deletes: u64,
189 removed_eq_deletes: u64,
190}
191
192impl UpdateMetrics {
193 fn add_file(&mut self, data_file: &DataFile) {
194 self.added_file_size += data_file.file_size_in_bytes;
195 match data_file.content_type() {
196 DataContentType::Data => {
197 self.added_data_files += 1;
198 self.added_records += data_file.record_count;
199 }
200 DataContentType::PositionDeletes => {
201 self.added_delete_files += 1;
202 self.added_pos_delete_files += 1;
203 self.added_pos_deletes += data_file.record_count;
204 }
205 DataContentType::EqualityDeletes => {
206 self.added_delete_files += 1;
207 self.added_eq_delete_files += 1;
208 self.added_eq_deletes += data_file.record_count;
209 }
210 }
211 }
212
213 fn remove_file(&mut self, data_file: &DataFile) {
214 self.removed_file_size += data_file.file_size_in_bytes;
215 match data_file.content_type() {
216 DataContentType::Data => {
217 self.removed_data_files += 1;
218 self.deleted_records += data_file.record_count;
219 }
220 DataContentType::PositionDeletes => {
221 self.removed_delete_files += 1;
222 self.removed_pos_delete_files += 1;
223 self.removed_pos_deletes += data_file.record_count;
224 }
225 DataContentType::EqualityDeletes => {
226 self.removed_delete_files += 1;
227 self.removed_eq_delete_files += 1;
228 self.removed_eq_deletes += data_file.record_count;
229 }
230 }
231 }
232
233 fn add_manifest(&mut self, manifest: &ManifestFile) {
234 match manifest.content {
235 ManifestContentType::Data => {
236 self.added_data_files += manifest.added_files_count.unwrap_or(0);
237 self.added_records += manifest.added_rows_count.unwrap_or(0);
238 self.removed_data_files += manifest.deleted_files_count.unwrap_or(0);
239 self.deleted_records += manifest.deleted_rows_count.unwrap_or(0);
240 }
241 ManifestContentType::Deletes => {
242 self.added_delete_files += manifest.added_files_count.unwrap_or(0);
243 self.removed_delete_files += manifest.deleted_files_count.unwrap_or(0);
244 }
245 }
246 }
247
248 fn to_map(&self) -> HashMap<String, String> {
249 let mut properties = HashMap::new();
250 set_if_positive(&mut properties, self.added_file_size, ADDED_FILE_SIZE);
251 set_if_positive(&mut properties, self.removed_file_size, REMOVED_FILE_SIZE);
252 set_if_positive(&mut properties, self.added_data_files, ADDED_DATA_FILES);
253 set_if_positive(&mut properties, self.removed_data_files, DELETED_DATA_FILES);
254 set_if_positive(
255 &mut properties,
256 self.added_eq_delete_files,
257 ADDED_EQUALITY_DELETE_FILES,
258 );
259 set_if_positive(
260 &mut properties,
261 self.removed_eq_delete_files,
262 REMOVED_EQUALITY_DELETE_FILES,
263 );
264 set_if_positive(
265 &mut properties,
266 self.added_pos_delete_files,
267 ADDED_POSITION_DELETE_FILES,
268 );
269 set_if_positive(
270 &mut properties,
271 self.removed_pos_delete_files,
272 REMOVED_POSITION_DELETE_FILES,
273 );
274 set_if_positive(&mut properties, self.added_delete_files, ADDED_DELETE_FILES);
275 set_if_positive(
276 &mut properties,
277 self.removed_delete_files,
278 REMOVED_DELETE_FILES,
279 );
280 set_if_positive(&mut properties, self.added_records, ADDED_RECORDS);
281 set_if_positive(&mut properties, self.deleted_records, DELETED_RECORDS);
282 set_if_positive(
283 &mut properties,
284 self.added_pos_deletes,
285 ADDED_POSITION_DELETES,
286 );
287 set_if_positive(
288 &mut properties,
289 self.removed_pos_deletes,
290 REMOVED_POSITION_DELETES,
291 );
292 set_if_positive(
293 &mut properties,
294 self.added_eq_deletes,
295 ADDED_EQUALITY_DELETES,
296 );
297 set_if_positive(
298 &mut properties,
299 self.removed_eq_deletes,
300 REMOVED_EQUALITY_DELETES,
301 );
302 properties
303 }
304
305 fn merge(&mut self, other: &UpdateMetrics) {
306 self.added_file_size += other.added_file_size;
307 self.removed_file_size += other.removed_file_size;
308 self.added_data_files += other.added_data_files;
309 self.removed_data_files += other.removed_data_files;
310 self.added_eq_delete_files += other.added_eq_delete_files;
311 self.removed_eq_delete_files += other.removed_eq_delete_files;
312 self.added_pos_delete_files += other.added_pos_delete_files;
313 self.removed_pos_delete_files += other.removed_pos_delete_files;
314 self.added_delete_files += other.added_delete_files;
315 self.removed_delete_files += other.removed_delete_files;
316 self.added_records += other.added_records;
317 self.deleted_records += other.deleted_records;
318 self.added_pos_deletes += other.added_pos_deletes;
319 self.removed_pos_deletes += other.removed_pos_deletes;
320 self.added_eq_deletes += other.added_eq_deletes;
321 self.removed_eq_deletes += other.removed_eq_deletes;
322 }
323}
324
325fn set_if_positive<T>(properties: &mut HashMap<String, String>, value: T, property_name: &str)
326where T: PartialOrd + Default + ToString {
327 if value > T::default() {
328 properties.insert(property_name.to_string(), value.to_string());
329 }
330}
331
332#[allow(dead_code)]
333pub(crate) fn update_snapshot_summaries(
334 summary: Summary,
335 previous_summary: Option<&Summary>,
336 truncate_full_table: bool,
337) -> Result<Summary> {
338 if summary.operation != Operation::Append
340 && summary.operation != Operation::Overwrite
341 && summary.operation != Operation::Delete
342 {
343 return Err(Error::new(
344 ErrorKind::DataInvalid,
345 "Operation is not supported.",
346 ));
347 }
348
349 let mut summary = match previous_summary {
350 Some(prev_summary) if truncate_full_table && summary.operation == Operation::Overwrite => {
351 truncate_table_summary(summary, prev_summary)
352 .map_err(|err| {
353 Error::new(ErrorKind::Unexpected, "Failed to truncate table summary.")
354 .with_source(err)
355 })
356 .unwrap()
357 }
358 _ => summary,
359 };
360
361 update_totals(
362 &mut summary,
363 previous_summary,
364 TOTAL_DATA_FILES,
365 ADDED_DATA_FILES,
366 DELETED_DATA_FILES,
367 );
368
369 update_totals(
370 &mut summary,
371 previous_summary,
372 TOTAL_DELETE_FILES,
373 ADDED_DELETE_FILES,
374 REMOVED_DELETE_FILES,
375 );
376
377 update_totals(
378 &mut summary,
379 previous_summary,
380 TOTAL_RECORDS,
381 ADDED_RECORDS,
382 DELETED_RECORDS,
383 );
384
385 update_totals(
386 &mut summary,
387 previous_summary,
388 TOTAL_FILE_SIZE,
389 ADDED_FILE_SIZE,
390 REMOVED_FILE_SIZE,
391 );
392
393 update_totals(
394 &mut summary,
395 previous_summary,
396 TOTAL_POSITION_DELETES,
397 ADDED_POSITION_DELETES,
398 REMOVED_POSITION_DELETES,
399 );
400
401 update_totals(
402 &mut summary,
403 previous_summary,
404 TOTAL_EQUALITY_DELETES,
405 ADDED_EQUALITY_DELETES,
406 REMOVED_EQUALITY_DELETES,
407 );
408 Ok(summary)
409}
410
411#[allow(dead_code)]
412fn get_prop(previous_summary: &Summary, prop: &str) -> Result<i32> {
413 let value_str = previous_summary
414 .additional_properties
415 .get(prop)
416 .map(String::as_str)
417 .unwrap_or("0");
418 value_str.parse::<i32>().map_err(|err| {
419 Error::new(
420 ErrorKind::Unexpected,
421 "Failed to parse value from previous summary property.",
422 )
423 .with_source(err)
424 })
425}
426
427#[allow(dead_code)]
428fn truncate_table_summary(mut summary: Summary, previous_summary: &Summary) -> Result<Summary> {
429 for prop in [
430 TOTAL_DATA_FILES,
431 TOTAL_DELETE_FILES,
432 TOTAL_RECORDS,
433 TOTAL_FILE_SIZE,
434 TOTAL_POSITION_DELETES,
435 TOTAL_EQUALITY_DELETES,
436 ] {
437 summary
438 .additional_properties
439 .insert(prop.to_string(), "0".to_string());
440 }
441
442 let value = get_prop(previous_summary, TOTAL_DATA_FILES)?;
443 if value != 0 {
444 summary
445 .additional_properties
446 .insert(DELETED_DATA_FILES.to_string(), value.to_string());
447 }
448 let value = get_prop(previous_summary, TOTAL_DELETE_FILES)?;
449 if value != 0 {
450 summary
451 .additional_properties
452 .insert(REMOVED_DELETE_FILES.to_string(), value.to_string());
453 }
454 let value = get_prop(previous_summary, TOTAL_RECORDS)?;
455 if value != 0 {
456 summary
457 .additional_properties
458 .insert(DELETED_RECORDS.to_string(), value.to_string());
459 }
460 let value = get_prop(previous_summary, TOTAL_FILE_SIZE)?;
461 if value != 0 {
462 summary
463 .additional_properties
464 .insert(REMOVED_FILE_SIZE.to_string(), value.to_string());
465 }
466
467 let value = get_prop(previous_summary, TOTAL_POSITION_DELETES)?;
468 if value != 0 {
469 summary
470 .additional_properties
471 .insert(REMOVED_POSITION_DELETES.to_string(), value.to_string());
472 }
473
474 let value = get_prop(previous_summary, TOTAL_EQUALITY_DELETES)?;
475 if value != 0 {
476 summary
477 .additional_properties
478 .insert(REMOVED_EQUALITY_DELETES.to_string(), value.to_string());
479 }
480
481 Ok(summary)
482}
483
484#[allow(dead_code)]
485fn update_totals(
486 summary: &mut Summary,
487 previous_summary: Option<&Summary>,
488 total_property: &str,
489 added_property: &str,
490 removed_property: &str,
491) {
492 let previous_total = previous_summary.map_or(0, |previous_summary| {
493 previous_summary
494 .additional_properties
495 .get(total_property)
496 .map_or(0, |value| value.parse::<u64>().unwrap())
497 });
498
499 let mut new_total = previous_total;
500 if let Some(value) = summary
501 .additional_properties
502 .get(added_property)
503 .map(|value| value.parse::<u64>().unwrap())
504 {
505 new_total += value;
506 }
507 if let Some(value) = summary
508 .additional_properties
509 .get(removed_property)
510 .map(|value| value.parse::<u64>().unwrap())
511 {
512 new_total -= value;
513 }
514 summary
515 .additional_properties
516 .insert(total_property.to_string(), new_total.to_string());
517}
518
519#[cfg(test)]
520mod tests {
521 use std::collections::HashMap;
522 use std::sync::Arc;
523
524 use super::*;
525 use crate::spec::{
526 DataFileFormat, Datum, Literal, NestedField, PartitionSpec, PrimitiveType, Schema, Struct,
527 Transform, Type, UnboundPartitionField,
528 };
529
530 #[test]
531 fn test_update_snapshot_summaries_append() {
532 let prev_props: HashMap<String, String> = [
533 (TOTAL_DATA_FILES.to_string(), "10".to_string()),
534 (TOTAL_DELETE_FILES.to_string(), "5".to_string()),
535 (TOTAL_RECORDS.to_string(), "100".to_string()),
536 (TOTAL_FILE_SIZE.to_string(), "1000".to_string()),
537 (TOTAL_POSITION_DELETES.to_string(), "3".to_string()),
538 (TOTAL_EQUALITY_DELETES.to_string(), "2".to_string()),
539 ]
540 .into_iter()
541 .collect();
542
543 let previous_summary = Summary {
544 operation: Operation::Append,
545 additional_properties: prev_props,
546 };
547
548 let new_props: HashMap<String, String> = [
549 (ADDED_DATA_FILES.to_string(), "4".to_string()),
550 (DELETED_DATA_FILES.to_string(), "1".to_string()),
551 (ADDED_DELETE_FILES.to_string(), "2".to_string()),
552 (REMOVED_DELETE_FILES.to_string(), "1".to_string()),
553 (ADDED_RECORDS.to_string(), "40".to_string()),
554 (DELETED_RECORDS.to_string(), "10".to_string()),
555 (ADDED_FILE_SIZE.to_string(), "400".to_string()),
556 (REMOVED_FILE_SIZE.to_string(), "100".to_string()),
557 (ADDED_POSITION_DELETES.to_string(), "5".to_string()),
558 (REMOVED_POSITION_DELETES.to_string(), "2".to_string()),
559 (ADDED_EQUALITY_DELETES.to_string(), "3".to_string()),
560 (REMOVED_EQUALITY_DELETES.to_string(), "1".to_string()),
561 ]
562 .into_iter()
563 .collect();
564
565 let summary = Summary {
566 operation: Operation::Append,
567 additional_properties: new_props,
568 };
569
570 let updated = update_snapshot_summaries(summary, Some(&previous_summary), false).unwrap();
571
572 assert_eq!(
573 updated.additional_properties.get(TOTAL_DATA_FILES).unwrap(),
574 "13"
575 );
576 assert_eq!(
577 updated
578 .additional_properties
579 .get(TOTAL_DELETE_FILES)
580 .unwrap(),
581 "6"
582 );
583 assert_eq!(
584 updated.additional_properties.get(TOTAL_RECORDS).unwrap(),
585 "130"
586 );
587 assert_eq!(
588 updated.additional_properties.get(TOTAL_FILE_SIZE).unwrap(),
589 "1300"
590 );
591 assert_eq!(
592 updated
593 .additional_properties
594 .get(TOTAL_POSITION_DELETES)
595 .unwrap(),
596 "6"
597 );
598 assert_eq!(
599 updated
600 .additional_properties
601 .get(TOTAL_EQUALITY_DELETES)
602 .unwrap(),
603 "4"
604 );
605 }
606
607 #[test]
608 fn test_truncate_table_summary() {
609 let prev_props: HashMap<String, String> = [
610 (TOTAL_DATA_FILES.to_string(), "10".to_string()),
611 (TOTAL_DELETE_FILES.to_string(), "5".to_string()),
612 (TOTAL_RECORDS.to_string(), "100".to_string()),
613 (TOTAL_FILE_SIZE.to_string(), "1000".to_string()),
614 (TOTAL_POSITION_DELETES.to_string(), "3".to_string()),
615 (TOTAL_EQUALITY_DELETES.to_string(), "2".to_string()),
616 ]
617 .into_iter()
618 .collect();
619
620 let previous_summary = Summary {
621 operation: Operation::Overwrite,
622 additional_properties: prev_props,
623 };
624
625 let mut new_props = HashMap::new();
626 new_props.insert("dummy".to_string(), "value".to_string());
627 let summary = Summary {
628 operation: Operation::Overwrite,
629 additional_properties: new_props,
630 };
631
632 let truncated = truncate_table_summary(summary, &previous_summary).unwrap();
633
634 assert_eq!(
635 truncated
636 .additional_properties
637 .get(TOTAL_DATA_FILES)
638 .unwrap(),
639 "0"
640 );
641 assert_eq!(
642 truncated
643 .additional_properties
644 .get(TOTAL_DELETE_FILES)
645 .unwrap(),
646 "0"
647 );
648 assert_eq!(
649 truncated.additional_properties.get(TOTAL_RECORDS).unwrap(),
650 "0"
651 );
652 assert_eq!(
653 truncated
654 .additional_properties
655 .get(TOTAL_FILE_SIZE)
656 .unwrap(),
657 "0"
658 );
659 assert_eq!(
660 truncated
661 .additional_properties
662 .get(TOTAL_POSITION_DELETES)
663 .unwrap(),
664 "0"
665 );
666 assert_eq!(
667 truncated
668 .additional_properties
669 .get(TOTAL_EQUALITY_DELETES)
670 .unwrap(),
671 "0"
672 );
673
674 assert_eq!(
675 truncated
676 .additional_properties
677 .get(DELETED_DATA_FILES)
678 .unwrap(),
679 "10"
680 );
681 assert_eq!(
682 truncated
683 .additional_properties
684 .get(REMOVED_DELETE_FILES)
685 .unwrap(),
686 "5"
687 );
688 assert_eq!(
689 truncated
690 .additional_properties
691 .get(DELETED_RECORDS)
692 .unwrap(),
693 "100"
694 );
695 assert_eq!(
696 truncated
697 .additional_properties
698 .get(REMOVED_FILE_SIZE)
699 .unwrap(),
700 "1000"
701 );
702 assert_eq!(
703 truncated
704 .additional_properties
705 .get(REMOVED_POSITION_DELETES)
706 .unwrap(),
707 "3"
708 );
709 assert_eq!(
710 truncated
711 .additional_properties
712 .get(REMOVED_EQUALITY_DELETES)
713 .unwrap(),
714 "2"
715 );
716 }
717
718 #[test]
719 fn test_snapshot_summary_collector_build() {
720 let schema = Arc::new(
721 Schema::builder()
722 .with_fields(vec![
723 NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
724 NestedField::required(2, "name", Type::Primitive(PrimitiveType::String)).into(),
725 ])
726 .build()
727 .unwrap(),
728 );
729
730 let partition_spec = Arc::new(
731 PartitionSpec::builder(schema.clone())
732 .add_unbound_fields(vec![
733 UnboundPartitionField::builder()
734 .source_id(2)
735 .name("year".to_string())
736 .transform(Transform::Identity)
737 .build(),
738 ])
739 .unwrap()
740 .with_spec_id(1)
741 .build()
742 .unwrap(),
743 );
744
745 let mut collector = SnapshotSummaryCollector::default();
746 collector.set_partition_summary_limit(10);
747
748 let file1 = DataFile {
749 content: DataContentType::Data,
750 file_path: "s3://testbucket/path/to/file1.parquet".to_string(),
751 file_format: DataFileFormat::Parquet,
752 partition: Struct::from_iter(vec![]),
753 record_count: 10,
754 file_size_in_bytes: 100,
755 column_sizes: HashMap::from([(1, 46), (2, 48), (3, 48)]),
756 value_counts: HashMap::from([(1, 10), (2, 10), (3, 10)]),
757 null_value_counts: HashMap::from([(1, 0), (2, 0), (3, 0)]),
758 nan_value_counts: HashMap::new(),
759 lower_bounds: HashMap::from([
760 (1, Datum::long(1)),
761 (2, Datum::string("a")),
762 (3, Datum::string("x")),
763 ]),
764 upper_bounds: HashMap::from([
765 (1, Datum::long(1)),
766 (2, Datum::string("a")),
767 (3, Datum::string("x")),
768 ]),
769 key_metadata: None,
770 split_offsets: vec![4],
771 equality_ids: None,
772 sort_order_id: Some(0),
773 partition_spec_id: 0,
774 first_row_id: None,
775 referenced_data_file: None,
776 content_offset: None,
777 content_size_in_bytes: None,
778 };
779
780 let file2 = DataFile {
781 content: DataContentType::Data,
782 file_path: "s3://testbucket/path/to/file2.parquet".to_string(),
783 file_format: DataFileFormat::Parquet,
784 partition: Struct::from_iter(vec![Some(Literal::string("2025"))]),
785 record_count: 20,
786 file_size_in_bytes: 200,
787 column_sizes: HashMap::from([(1, 46), (2, 48), (3, 48)]),
788 value_counts: HashMap::from([(1, 20), (2, 20), (3, 20)]),
789 null_value_counts: HashMap::from([(1, 0), (2, 0), (3, 0)]),
790 nan_value_counts: HashMap::new(),
791 lower_bounds: HashMap::from([
792 (1, Datum::long(1)),
793 (2, Datum::string("a")),
794 (3, Datum::string("x")),
795 ]),
796 upper_bounds: HashMap::from([
797 (1, Datum::long(1)),
798 (2, Datum::string("a")),
799 (3, Datum::string("x")),
800 ]),
801 key_metadata: None,
802 split_offsets: vec![4],
803 equality_ids: None,
804 sort_order_id: Some(0),
805 partition_spec_id: 0,
806 first_row_id: None,
807 referenced_data_file: None,
808 content_offset: None,
809 content_size_in_bytes: None,
810 };
811
812 collector.add_file(&file1, schema.clone(), partition_spec.clone());
813 collector.add_file(&file2, schema.clone(), partition_spec.clone());
814
815 collector.remove_file(&file1, schema.clone(), partition_spec.clone());
816
817 let props = collector.build();
818
819 assert_eq!(props.get(ADDED_FILE_SIZE).unwrap(), "300");
820 assert_eq!(props.get(REMOVED_FILE_SIZE).unwrap(), "100");
821
822 let partition_key = format!("{}{}", CHANGED_PARTITION_PREFIX, "year=2025");
823
824 assert!(props.contains_key(&partition_key));
825
826 let partition_summary = props.get(&partition_key).unwrap();
827 assert!(partition_summary.contains(&format!("{ADDED_FILE_SIZE}=200")));
828 assert!(partition_summary.contains(&format!("{ADDED_DATA_FILES}=1")));
829 assert!(partition_summary.contains(&format!("{ADDED_RECORDS}=20")));
830 }
831
832 #[test]
833 fn test_snapshot_summary_collector_add_manifest() {
834 let mut collector = SnapshotSummaryCollector::default();
835 collector.set_partition_summary_limit(10);
836
837 let manifest = ManifestFile {
838 manifest_path: "file://dummy.manifest".to_string(),
839 manifest_length: 0,
840 partition_spec_id: 0,
841 content: ManifestContentType::Data,
842 sequence_number: 0,
843 min_sequence_number: 0,
844 added_snapshot_id: 0,
845 added_files_count: Some(3),
846 existing_files_count: Some(0),
847 deleted_files_count: Some(1),
848 added_rows_count: Some(100),
849 existing_rows_count: Some(0),
850 deleted_rows_count: Some(50),
851 partitions: Some(Vec::new()),
852 key_metadata: None,
853 first_row_id: None,
854 };
855
856 collector
857 .partition_metrics
858 .insert("dummy".to_string(), UpdateMetrics::default());
859 collector.add_manifest(&manifest);
860
861 let props = collector.build();
862 assert_eq!(props.get(ADDED_DATA_FILES).unwrap(), "3");
863 assert_eq!(props.get(DELETED_DATA_FILES).unwrap(), "1");
864 assert_eq!(props.get(ADDED_RECORDS).unwrap(), "100");
865 assert_eq!(props.get(DELETED_RECORDS).unwrap(), "50");
866 }
867
868 #[test]
869 fn test_snapshot_summary_collector_merge() {
870 let schema = Arc::new(
871 Schema::builder()
872 .with_fields(vec![
873 NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
874 NestedField::required(2, "name", Type::Primitive(PrimitiveType::String)).into(),
875 ])
876 .build()
877 .unwrap(),
878 );
879
880 let partition_spec = Arc::new(
881 PartitionSpec::builder(schema.clone())
882 .add_unbound_fields(vec![
883 UnboundPartitionField::builder()
884 .source_id(2)
885 .name("year".to_string())
886 .transform(Transform::Identity)
887 .build(),
888 ])
889 .unwrap()
890 .with_spec_id(1)
891 .build()
892 .unwrap(),
893 );
894
895 let mut summary_one = SnapshotSummaryCollector::default();
896 let mut summary_two = SnapshotSummaryCollector::default();
897
898 summary_one.add_file(
899 &DataFile {
900 content: DataContentType::Data,
901 file_path: "test.parquet".into(),
902 file_format: DataFileFormat::Parquet,
903 partition: Struct::from_iter(vec![]),
904 record_count: 10,
905 file_size_in_bytes: 100,
906 column_sizes: HashMap::new(),
907 value_counts: HashMap::new(),
908 null_value_counts: HashMap::new(),
909 nan_value_counts: HashMap::new(),
910 lower_bounds: HashMap::new(),
911 upper_bounds: HashMap::new(),
912 key_metadata: None,
913 split_offsets: vec![],
914 equality_ids: None,
915 sort_order_id: None,
916 partition_spec_id: 0,
917 first_row_id: None,
918 referenced_data_file: None,
919 content_offset: None,
920 content_size_in_bytes: None,
921 },
922 schema.clone(),
923 partition_spec.clone(),
924 );
925
926 summary_two.add_file(
927 &DataFile {
928 content: DataContentType::Data,
929 file_path: "test.parquet".into(),
930 file_format: DataFileFormat::Parquet,
931 partition: Struct::from_iter(vec![]),
932 record_count: 20,
933 file_size_in_bytes: 200,
934 column_sizes: HashMap::new(),
935 value_counts: HashMap::new(),
936 null_value_counts: HashMap::new(),
937 nan_value_counts: HashMap::new(),
938 lower_bounds: HashMap::new(),
939 upper_bounds: HashMap::new(),
940 key_metadata: None,
941 split_offsets: vec![],
942 equality_ids: None,
943 sort_order_id: None,
944 partition_spec_id: 0,
945 first_row_id: None,
946 referenced_data_file: None,
947 content_offset: None,
948 content_size_in_bytes: None,
949 },
950 schema.clone(),
951 partition_spec.clone(),
952 );
953
954 summary_one.merge(summary_two);
955 let props = summary_one.build();
956 assert_eq!(props.get(ADDED_DATA_FILES).unwrap(), "2");
957 assert_eq!(props.get(ADDED_RECORDS).unwrap(), "30");
958
959 let mut summary_three = SnapshotSummaryCollector::default();
960 let mut summary_four = SnapshotSummaryCollector::default();
961
962 summary_three.add_manifest(&ManifestFile {
963 manifest_path: "test.manifest".to_string(),
964 manifest_length: 0,
965 partition_spec_id: 0,
966 content: ManifestContentType::Data,
967 sequence_number: 0,
968 min_sequence_number: 0,
969 added_snapshot_id: 0,
970 added_files_count: Some(1),
971 existing_files_count: Some(0),
972 deleted_files_count: Some(0),
973 added_rows_count: Some(5),
974 existing_rows_count: Some(0),
975 deleted_rows_count: Some(0),
976 partitions: Some(Vec::new()),
977 key_metadata: None,
978 first_row_id: None,
979 });
980
981 summary_four.add_file(
982 &DataFile {
983 content: DataContentType::Data,
984 file_path: "test.parquet".into(),
985 file_format: DataFileFormat::Parquet,
986 partition: Struct::from_iter(vec![]),
987 record_count: 1,
988 file_size_in_bytes: 10,
989 column_sizes: HashMap::new(),
990 value_counts: HashMap::new(),
991 null_value_counts: HashMap::new(),
992 nan_value_counts: HashMap::new(),
993 lower_bounds: HashMap::new(),
994 upper_bounds: HashMap::new(),
995 key_metadata: None,
996 split_offsets: vec![],
997 equality_ids: None,
998 sort_order_id: None,
999 partition_spec_id: 0,
1000 first_row_id: None,
1001 referenced_data_file: None,
1002 content_offset: None,
1003 content_size_in_bytes: None,
1004 },
1005 schema.clone(),
1006 partition_spec.clone(),
1007 );
1008
1009 summary_three.merge(summary_four);
1010 let props = summary_three.build();
1011
1012 assert_eq!(props.get(ADDED_DATA_FILES).unwrap(), "2");
1013 assert_eq!(props.get(ADDED_RECORDS).unwrap(), "6");
1014 assert!(
1015 props
1016 .iter()
1017 .all(|(k, _)| !k.starts_with(CHANGED_PARTITION_PREFIX))
1018 );
1019 }
1020}