iceberg/spec/
snapshot_summary.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::collections::HashMap;
19
20use itertools::Itertools;
21
22use super::{DataContentType, DataFile, PartitionSpecRef};
23use crate::spec::{ManifestContentType, ManifestFile, Operation, SchemaRef, Summary};
24use crate::{Error, ErrorKind, Result};
25
26const ADDED_DATA_FILES: &str = "added-data-files";
27const ADDED_DELETE_FILES: &str = "added-delete-files";
28const ADDED_EQUALITY_DELETES: &str = "added-equality-deletes";
29const ADDED_FILE_SIZE: &str = "added-files-size";
30const ADDED_POSITION_DELETES: &str = "added-position-deletes";
31const ADDED_POSITION_DELETE_FILES: &str = "added-position-delete-files";
32const ADDED_RECORDS: &str = "added-records";
33const DELETED_DATA_FILES: &str = "deleted-data-files";
34const DELETED_RECORDS: &str = "deleted-records";
35const ADDED_EQUALITY_DELETE_FILES: &str = "added-equality-delete-files";
36const REMOVED_DELETE_FILES: &str = "removed-delete-files";
37const REMOVED_EQUALITY_DELETES: &str = "removed-equality-deletes";
38const REMOVED_EQUALITY_DELETE_FILES: &str = "removed-equality-delete-files";
39const REMOVED_FILE_SIZE: &str = "removed-files-size";
40const REMOVED_POSITION_DELETES: &str = "removed-position-deletes";
41const REMOVED_POSITION_DELETE_FILES: &str = "removed-position-delete-files";
42const TOTAL_EQUALITY_DELETES: &str = "total-equality-deletes";
43const TOTAL_POSITION_DELETES: &str = "total-position-deletes";
44const TOTAL_DATA_FILES: &str = "total-data-files";
45const TOTAL_DELETE_FILES: &str = "total-delete-files";
46const TOTAL_RECORDS: &str = "total-records";
47const TOTAL_FILE_SIZE: &str = "total-files-size";
48const CHANGED_PARTITION_COUNT_PROP: &str = "changed-partition-count";
49const CHANGED_PARTITION_PREFIX: &str = "partitions.";
50
51/// `SnapshotSummaryCollector` collects and aggregates snapshot update metrics.
52/// It gathers metrics about added or removed data files and manifests, and tracks
53/// partition-specific updates.
54#[derive(Default)]
55pub struct SnapshotSummaryCollector {
56    metrics: UpdateMetrics,
57    partition_metrics: HashMap<String, UpdateMetrics>,
58    max_changed_partitions_for_summaries: u64,
59    properties: HashMap<String, String>,
60    trust_partition_metrics: bool,
61}
62
63impl SnapshotSummaryCollector {
64    /// Set properties for snapshot summary
65    pub fn set(&mut self, key: &str, value: &str) {
66        self.properties.insert(key.to_string(), value.to_string());
67    }
68
69    /// Sets the limit for including partition summaries. Summaries are not
70    /// included if the number of partitions is exceeded.
71    pub fn set_partition_summary_limit(&mut self, limit: u64) {
72        self.max_changed_partitions_for_summaries = limit;
73    }
74
75    /// Adds a data file to the summary collector
76    pub fn add_file(
77        &mut self,
78        data_file: &DataFile,
79        schema: SchemaRef,
80        partition_spec: PartitionSpecRef,
81    ) {
82        self.metrics.add_file(data_file);
83        if !data_file.partition.fields().is_empty() {
84            self.update_partition_metrics(schema, partition_spec, data_file, true);
85        }
86    }
87
88    /// Removes a data file from the summary collector
89    pub fn remove_file(
90        &mut self,
91        data_file: &DataFile,
92        schema: SchemaRef,
93        partition_spec: PartitionSpecRef,
94    ) {
95        self.metrics.remove_file(data_file);
96        if !data_file.partition.fields().is_empty() {
97            self.update_partition_metrics(schema, partition_spec, data_file, false);
98        }
99    }
100
101    /// Adds a manifest to the summary collector
102    pub fn add_manifest(&mut self, manifest: &ManifestFile) {
103        self.trust_partition_metrics = false;
104        self.partition_metrics.clear();
105        self.metrics.add_manifest(manifest);
106    }
107
108    /// Updates partition-specific metrics for a data file.
109    pub fn update_partition_metrics(
110        &mut self,
111        schema: SchemaRef,
112        partition_spec: PartitionSpecRef,
113        data_file: &DataFile,
114        is_add_file: bool,
115    ) {
116        let partition_path = partition_spec.partition_to_path(&data_file.partition, schema);
117        let metrics = self.partition_metrics.entry(partition_path).or_default();
118
119        if is_add_file {
120            metrics.add_file(data_file);
121        } else {
122            metrics.remove_file(data_file);
123        }
124    }
125
126    /// Merges another `SnapshotSummaryCollector` into the current one
127    pub fn merge(&mut self, summary: SnapshotSummaryCollector) {
128        self.metrics.merge(&summary.metrics);
129        self.properties.extend(summary.properties);
130
131        if self.trust_partition_metrics && summary.trust_partition_metrics {
132            for (partition, partition_metric) in summary.partition_metrics.iter() {
133                self.partition_metrics
134                    .entry(partition.to_string())
135                    .or_default()
136                    .merge(partition_metric);
137            }
138        } else {
139            self.partition_metrics.clear();
140            self.trust_partition_metrics = false;
141        }
142    }
143
144    /// Builds final map of summaries
145    pub fn build(&self) -> HashMap<String, String> {
146        let mut properties = self.metrics.to_map();
147        let changed_partitions_count = self.partition_metrics.len() as u64;
148        set_if_positive(
149            &mut properties,
150            changed_partitions_count,
151            CHANGED_PARTITION_COUNT_PROP,
152        );
153
154        if changed_partitions_count <= self.max_changed_partitions_for_summaries {
155            for (partition_path, update_metrics_partition) in &self.partition_metrics {
156                let property_key = format!("{CHANGED_PARTITION_PREFIX}{partition_path}");
157                let partition_summary = update_metrics_partition
158                    .to_map()
159                    .into_iter()
160                    .map(|(property, value)| format!("{property}={value}"))
161                    .join(",");
162
163                if !partition_summary.is_empty() {
164                    properties.insert(property_key, partition_summary);
165                }
166            }
167        }
168        properties
169    }
170}
171
172#[derive(Debug, Default)]
173struct UpdateMetrics {
174    added_file_size: u64,
175    removed_file_size: u64,
176    added_data_files: u32,
177    removed_data_files: u32,
178    added_eq_delete_files: u64,
179    removed_eq_delete_files: u64,
180    added_pos_delete_files: u64,
181    removed_pos_delete_files: u64,
182    added_delete_files: u32,
183    removed_delete_files: u32,
184    added_records: u64,
185    deleted_records: u64,
186    added_pos_deletes: u64,
187    removed_pos_deletes: u64,
188    added_eq_deletes: u64,
189    removed_eq_deletes: u64,
190}
191
192impl UpdateMetrics {
193    fn add_file(&mut self, data_file: &DataFile) {
194        self.added_file_size += data_file.file_size_in_bytes;
195        match data_file.content_type() {
196            DataContentType::Data => {
197                self.added_data_files += 1;
198                self.added_records += data_file.record_count;
199            }
200            DataContentType::PositionDeletes => {
201                self.added_delete_files += 1;
202                self.added_pos_delete_files += 1;
203                self.added_pos_deletes += data_file.record_count;
204            }
205            DataContentType::EqualityDeletes => {
206                self.added_delete_files += 1;
207                self.added_eq_delete_files += 1;
208                self.added_eq_deletes += data_file.record_count;
209            }
210        }
211    }
212
213    fn remove_file(&mut self, data_file: &DataFile) {
214        self.removed_file_size += data_file.file_size_in_bytes;
215        match data_file.content_type() {
216            DataContentType::Data => {
217                self.removed_data_files += 1;
218                self.deleted_records += data_file.record_count;
219            }
220            DataContentType::PositionDeletes => {
221                self.removed_delete_files += 1;
222                self.removed_pos_delete_files += 1;
223                self.removed_pos_deletes += data_file.record_count;
224            }
225            DataContentType::EqualityDeletes => {
226                self.removed_delete_files += 1;
227                self.removed_eq_delete_files += 1;
228                self.removed_eq_deletes += data_file.record_count;
229            }
230        }
231    }
232
233    fn add_manifest(&mut self, manifest: &ManifestFile) {
234        match manifest.content {
235            ManifestContentType::Data => {
236                self.added_data_files += manifest.added_files_count.unwrap_or(0);
237                self.added_records += manifest.added_rows_count.unwrap_or(0);
238                self.removed_data_files += manifest.deleted_files_count.unwrap_or(0);
239                self.deleted_records += manifest.deleted_rows_count.unwrap_or(0);
240            }
241            ManifestContentType::Deletes => {
242                self.added_delete_files += manifest.added_files_count.unwrap_or(0);
243                self.removed_delete_files += manifest.deleted_files_count.unwrap_or(0);
244            }
245        }
246    }
247
248    fn to_map(&self) -> HashMap<String, String> {
249        let mut properties = HashMap::new();
250        set_if_positive(&mut properties, self.added_file_size, ADDED_FILE_SIZE);
251        set_if_positive(&mut properties, self.removed_file_size, REMOVED_FILE_SIZE);
252        set_if_positive(&mut properties, self.added_data_files, ADDED_DATA_FILES);
253        set_if_positive(&mut properties, self.removed_data_files, DELETED_DATA_FILES);
254        set_if_positive(
255            &mut properties,
256            self.added_eq_delete_files,
257            ADDED_EQUALITY_DELETE_FILES,
258        );
259        set_if_positive(
260            &mut properties,
261            self.removed_eq_delete_files,
262            REMOVED_EQUALITY_DELETE_FILES,
263        );
264        set_if_positive(
265            &mut properties,
266            self.added_pos_delete_files,
267            ADDED_POSITION_DELETE_FILES,
268        );
269        set_if_positive(
270            &mut properties,
271            self.removed_pos_delete_files,
272            REMOVED_POSITION_DELETE_FILES,
273        );
274        set_if_positive(&mut properties, self.added_delete_files, ADDED_DELETE_FILES);
275        set_if_positive(
276            &mut properties,
277            self.removed_delete_files,
278            REMOVED_DELETE_FILES,
279        );
280        set_if_positive(&mut properties, self.added_records, ADDED_RECORDS);
281        set_if_positive(&mut properties, self.deleted_records, DELETED_RECORDS);
282        set_if_positive(
283            &mut properties,
284            self.added_pos_deletes,
285            ADDED_POSITION_DELETES,
286        );
287        set_if_positive(
288            &mut properties,
289            self.removed_pos_deletes,
290            REMOVED_POSITION_DELETES,
291        );
292        set_if_positive(
293            &mut properties,
294            self.added_eq_deletes,
295            ADDED_EQUALITY_DELETES,
296        );
297        set_if_positive(
298            &mut properties,
299            self.removed_eq_deletes,
300            REMOVED_EQUALITY_DELETES,
301        );
302        properties
303    }
304
305    fn merge(&mut self, other: &UpdateMetrics) {
306        self.added_file_size += other.added_file_size;
307        self.removed_file_size += other.removed_file_size;
308        self.added_data_files += other.added_data_files;
309        self.removed_data_files += other.removed_data_files;
310        self.added_eq_delete_files += other.added_eq_delete_files;
311        self.removed_eq_delete_files += other.removed_eq_delete_files;
312        self.added_pos_delete_files += other.added_pos_delete_files;
313        self.removed_pos_delete_files += other.removed_pos_delete_files;
314        self.added_delete_files += other.added_delete_files;
315        self.removed_delete_files += other.removed_delete_files;
316        self.added_records += other.added_records;
317        self.deleted_records += other.deleted_records;
318        self.added_pos_deletes += other.added_pos_deletes;
319        self.removed_pos_deletes += other.removed_pos_deletes;
320        self.added_eq_deletes += other.added_eq_deletes;
321        self.removed_eq_deletes += other.removed_eq_deletes;
322    }
323}
324
325fn set_if_positive<T>(properties: &mut HashMap<String, String>, value: T, property_name: &str)
326where T: PartialOrd + Default + ToString {
327    if value > T::default() {
328        properties.insert(property_name.to_string(), value.to_string());
329    }
330}
331
332pub(crate) fn update_snapshot_summaries(
333    summary: Summary,
334    previous_summary: Option<&Summary>,
335    truncate_full_table: bool,
336) -> Result<Summary> {
337    // Validate that the operation is supported
338    if summary.operation != Operation::Append
339        && summary.operation != Operation::Overwrite
340        && summary.operation != Operation::Delete
341    {
342        return Err(Error::new(
343            ErrorKind::DataInvalid,
344            "Operation is not supported.",
345        ));
346    }
347
348    let mut summary = match previous_summary {
349        Some(prev_summary) if truncate_full_table && summary.operation == Operation::Overwrite => {
350            truncate_table_summary(summary, prev_summary).map_err(|err| {
351                Error::new(ErrorKind::Unexpected, "Failed to truncate table summary.")
352                    .with_source(err)
353            })?
354        }
355        _ => summary,
356    };
357
358    update_totals(
359        &mut summary,
360        previous_summary,
361        TOTAL_DATA_FILES,
362        ADDED_DATA_FILES,
363        DELETED_DATA_FILES,
364    );
365
366    update_totals(
367        &mut summary,
368        previous_summary,
369        TOTAL_DELETE_FILES,
370        ADDED_DELETE_FILES,
371        REMOVED_DELETE_FILES,
372    );
373
374    update_totals(
375        &mut summary,
376        previous_summary,
377        TOTAL_RECORDS,
378        ADDED_RECORDS,
379        DELETED_RECORDS,
380    );
381
382    update_totals(
383        &mut summary,
384        previous_summary,
385        TOTAL_FILE_SIZE,
386        ADDED_FILE_SIZE,
387        REMOVED_FILE_SIZE,
388    );
389
390    update_totals(
391        &mut summary,
392        previous_summary,
393        TOTAL_POSITION_DELETES,
394        ADDED_POSITION_DELETES,
395        REMOVED_POSITION_DELETES,
396    );
397
398    update_totals(
399        &mut summary,
400        previous_summary,
401        TOTAL_EQUALITY_DELETES,
402        ADDED_EQUALITY_DELETES,
403        REMOVED_EQUALITY_DELETES,
404    );
405    Ok(summary)
406}
407
408fn get_prop(previous_summary: &Summary, prop: &str) -> Result<u64> {
409    let value_str = previous_summary
410        .additional_properties
411        .get(prop)
412        .map(String::as_str)
413        .unwrap_or("0");
414    value_str.parse::<u64>().map_err(|err| {
415        Error::new(
416            ErrorKind::Unexpected,
417            format!("Failed to parse summary property '{prop}' value '{value_str}' as u64."),
418        )
419        .with_source(err)
420    })
421}
422
423fn truncate_table_summary(mut summary: Summary, previous_summary: &Summary) -> Result<Summary> {
424    for prop in [
425        TOTAL_DATA_FILES,
426        TOTAL_DELETE_FILES,
427        TOTAL_RECORDS,
428        TOTAL_FILE_SIZE,
429        TOTAL_POSITION_DELETES,
430        TOTAL_EQUALITY_DELETES,
431    ] {
432        summary
433            .additional_properties
434            .insert(prop.to_string(), "0".to_string());
435    }
436
437    let value = get_prop(previous_summary, TOTAL_DATA_FILES)?;
438    if value != 0 {
439        summary
440            .additional_properties
441            .insert(DELETED_DATA_FILES.to_string(), value.to_string());
442    }
443    let value = get_prop(previous_summary, TOTAL_DELETE_FILES)?;
444    if value != 0 {
445        summary
446            .additional_properties
447            .insert(REMOVED_DELETE_FILES.to_string(), value.to_string());
448    }
449    let value = get_prop(previous_summary, TOTAL_RECORDS)?;
450    if value != 0 {
451        summary
452            .additional_properties
453            .insert(DELETED_RECORDS.to_string(), value.to_string());
454    }
455    let value = get_prop(previous_summary, TOTAL_FILE_SIZE)?;
456    if value != 0 {
457        summary
458            .additional_properties
459            .insert(REMOVED_FILE_SIZE.to_string(), value.to_string());
460    }
461
462    let value = get_prop(previous_summary, TOTAL_POSITION_DELETES)?;
463    if value != 0 {
464        summary
465            .additional_properties
466            .insert(REMOVED_POSITION_DELETES.to_string(), value.to_string());
467    }
468
469    let value = get_prop(previous_summary, TOTAL_EQUALITY_DELETES)?;
470    if value != 0 {
471        summary
472            .additional_properties
473            .insert(REMOVED_EQUALITY_DELETES.to_string(), value.to_string());
474    }
475
476    Ok(summary)
477}
478
479fn update_totals(
480    summary: &mut Summary,
481    previous_summary: Option<&Summary>,
482    total_property: &str,
483    added_property: &str,
484    removed_property: &str,
485) {
486    let previous_total = match previous_summary {
487        None => 0,
488        Some(prev_summary) => match prev_summary.additional_properties.get(total_property) {
489            Some(value_str) => match value_str.parse::<u64>() {
490                Ok(v) => v,
491                Err(parse_err) => {
492                    tracing::warn!(
493                        "Property '{total_property}' could not be parsed in the previous snapshot summary: {parse_err}. \
494                         Skipping total computation.",
495                    );
496                    return;
497                }
498            },
499            None => {
500                tracing::debug!(
501                    "Property '{total_property}' was not set in the previous snapshot summary. \
502                     Skipping total computation."
503                );
504                return;
505            }
506        },
507    };
508
509    let added = summary
510        .additional_properties
511        .get(added_property)
512        .map_or(0, |value| {
513            value
514                .parse::<u64>()
515                .expect("must be parsable as it was just serialized")
516        });
517    let removed = summary
518        .additional_properties
519        .get(removed_property)
520        .map_or(0, |value| {
521            value
522                .parse::<u64>()
523                .expect("must be parsable as it was just serialized")
524        });
525
526    let new_total = previous_total + added - removed;
527    summary
528        .additional_properties
529        .insert(total_property.to_string(), new_total.to_string());
530}
531
532#[cfg(test)]
533mod tests {
534    use std::collections::HashMap;
535    use std::sync::Arc;
536
537    use super::*;
538    use crate::spec::{
539        DataFileFormat, Datum, Literal, NestedField, PartitionSpec, PrimitiveType, Schema, Struct,
540        Transform, Type, UnboundPartitionField,
541    };
542
543    #[test]
544    fn test_update_snapshot_summaries_append() {
545        let prev_props: HashMap<String, String> = [
546            (TOTAL_DATA_FILES.to_string(), "10".to_string()),
547            (TOTAL_DELETE_FILES.to_string(), "5".to_string()),
548            (TOTAL_RECORDS.to_string(), "100".to_string()),
549            (TOTAL_FILE_SIZE.to_string(), "1000".to_string()),
550            (TOTAL_POSITION_DELETES.to_string(), "3".to_string()),
551            (TOTAL_EQUALITY_DELETES.to_string(), "2".to_string()),
552        ]
553        .into_iter()
554        .collect();
555
556        let previous_summary = Summary {
557            operation: Operation::Append,
558            additional_properties: prev_props,
559        };
560
561        let new_props: HashMap<String, String> = [
562            (ADDED_DATA_FILES.to_string(), "4".to_string()),
563            (DELETED_DATA_FILES.to_string(), "1".to_string()),
564            (ADDED_DELETE_FILES.to_string(), "2".to_string()),
565            (REMOVED_DELETE_FILES.to_string(), "1".to_string()),
566            (ADDED_RECORDS.to_string(), "40".to_string()),
567            (DELETED_RECORDS.to_string(), "10".to_string()),
568            (ADDED_FILE_SIZE.to_string(), "400".to_string()),
569            (REMOVED_FILE_SIZE.to_string(), "100".to_string()),
570            (ADDED_POSITION_DELETES.to_string(), "5".to_string()),
571            (REMOVED_POSITION_DELETES.to_string(), "2".to_string()),
572            (ADDED_EQUALITY_DELETES.to_string(), "3".to_string()),
573            (REMOVED_EQUALITY_DELETES.to_string(), "1".to_string()),
574        ]
575        .into_iter()
576        .collect();
577
578        let summary = Summary {
579            operation: Operation::Append,
580            additional_properties: new_props,
581        };
582
583        let updated = update_snapshot_summaries(summary, Some(&previous_summary), false).unwrap();
584
585        assert_eq!(
586            updated.additional_properties.get(TOTAL_DATA_FILES).unwrap(),
587            "13"
588        );
589        assert_eq!(
590            updated
591                .additional_properties
592                .get(TOTAL_DELETE_FILES)
593                .unwrap(),
594            "6"
595        );
596        assert_eq!(
597            updated.additional_properties.get(TOTAL_RECORDS).unwrap(),
598            "130"
599        );
600        assert_eq!(
601            updated.additional_properties.get(TOTAL_FILE_SIZE).unwrap(),
602            "1300"
603        );
604        assert_eq!(
605            updated
606                .additional_properties
607                .get(TOTAL_POSITION_DELETES)
608                .unwrap(),
609            "6"
610        );
611        assert_eq!(
612            updated
613                .additional_properties
614                .get(TOTAL_EQUALITY_DELETES)
615                .unwrap(),
616            "4"
617        );
618    }
619
620    #[test]
621    fn test_truncate_table_summary() {
622        let prev_props: HashMap<String, String> = [
623            (TOTAL_DATA_FILES.to_string(), "10".to_string()),
624            (TOTAL_DELETE_FILES.to_string(), "5".to_string()),
625            (TOTAL_RECORDS.to_string(), "100".to_string()),
626            (TOTAL_FILE_SIZE.to_string(), "1000".to_string()),
627            (TOTAL_POSITION_DELETES.to_string(), "3".to_string()),
628            (TOTAL_EQUALITY_DELETES.to_string(), "2".to_string()),
629        ]
630        .into_iter()
631        .collect();
632
633        let previous_summary = Summary {
634            operation: Operation::Overwrite,
635            additional_properties: prev_props,
636        };
637
638        let mut new_props = HashMap::new();
639        new_props.insert("dummy".to_string(), "value".to_string());
640        let summary = Summary {
641            operation: Operation::Overwrite,
642            additional_properties: new_props,
643        };
644
645        let truncated = truncate_table_summary(summary, &previous_summary).unwrap();
646
647        assert_eq!(
648            truncated
649                .additional_properties
650                .get(TOTAL_DATA_FILES)
651                .unwrap(),
652            "0"
653        );
654        assert_eq!(
655            truncated
656                .additional_properties
657                .get(TOTAL_DELETE_FILES)
658                .unwrap(),
659            "0"
660        );
661        assert_eq!(
662            truncated.additional_properties.get(TOTAL_RECORDS).unwrap(),
663            "0"
664        );
665        assert_eq!(
666            truncated
667                .additional_properties
668                .get(TOTAL_FILE_SIZE)
669                .unwrap(),
670            "0"
671        );
672        assert_eq!(
673            truncated
674                .additional_properties
675                .get(TOTAL_POSITION_DELETES)
676                .unwrap(),
677            "0"
678        );
679        assert_eq!(
680            truncated
681                .additional_properties
682                .get(TOTAL_EQUALITY_DELETES)
683                .unwrap(),
684            "0"
685        );
686
687        assert_eq!(
688            truncated
689                .additional_properties
690                .get(DELETED_DATA_FILES)
691                .unwrap(),
692            "10"
693        );
694        assert_eq!(
695            truncated
696                .additional_properties
697                .get(REMOVED_DELETE_FILES)
698                .unwrap(),
699            "5"
700        );
701        assert_eq!(
702            truncated
703                .additional_properties
704                .get(DELETED_RECORDS)
705                .unwrap(),
706            "100"
707        );
708        assert_eq!(
709            truncated
710                .additional_properties
711                .get(REMOVED_FILE_SIZE)
712                .unwrap(),
713            "1000"
714        );
715        assert_eq!(
716            truncated
717                .additional_properties
718                .get(REMOVED_POSITION_DELETES)
719                .unwrap(),
720            "3"
721        );
722        assert_eq!(
723            truncated
724                .additional_properties
725                .get(REMOVED_EQUALITY_DELETES)
726                .unwrap(),
727            "2"
728        );
729    }
730
731    #[test]
732    fn test_update_snapshot_summaries_overwrite_truncate_handles_totals_above_i32_max() {
733        // A table can legitimately accumulate more than i32::MAX rows or files
734        // over its lifetime. Truncating such a table on overwrite must succeed
735        // and surface the previous totals into the deleted-* counters.
736        let big = (i32::MAX as u64 + 1).to_string(); // 2_147_483_648
737        let prev_props: HashMap<String, String> = [
738            (TOTAL_DATA_FILES.to_string(), big.clone()),
739            (TOTAL_DELETE_FILES.to_string(), "0".to_string()),
740            (TOTAL_RECORDS.to_string(), big.clone()),
741            (TOTAL_FILE_SIZE.to_string(), "0".to_string()),
742            (TOTAL_POSITION_DELETES.to_string(), "0".to_string()),
743            (TOTAL_EQUALITY_DELETES.to_string(), "0".to_string()),
744        ]
745        .into_iter()
746        .collect();
747
748        let previous_summary = Summary {
749            operation: Operation::Overwrite,
750            additional_properties: prev_props,
751        };
752
753        let summary = Summary {
754            operation: Operation::Overwrite,
755            additional_properties: HashMap::new(),
756        };
757
758        let updated = update_snapshot_summaries(summary, Some(&previous_summary), true)
759            .expect("overwrite truncation should accept totals above i32::MAX");
760        assert_eq!(
761            updated
762                .additional_properties
763                .get(DELETED_DATA_FILES)
764                .unwrap(),
765            &big
766        );
767        assert_eq!(
768            updated.additional_properties.get(DELETED_RECORDS).unwrap(),
769            &big
770        );
771    }
772
773    #[test]
774    fn test_update_snapshot_summaries_overwrite_truncate_returns_err_on_malformed_total() {
775        // Non-numeric values in the previous summary (corruption, manual edits,
776        // a foreign implementation) must surface as a recoverable Err - not
777        // crash the process.
778        let prev_props: HashMap<String, String> = [
779            (TOTAL_DATA_FILES.to_string(), "not_a_number".to_string()),
780            (TOTAL_DELETE_FILES.to_string(), "0".to_string()),
781            (TOTAL_RECORDS.to_string(), "0".to_string()),
782            (TOTAL_FILE_SIZE.to_string(), "0".to_string()),
783            (TOTAL_POSITION_DELETES.to_string(), "0".to_string()),
784            (TOTAL_EQUALITY_DELETES.to_string(), "0".to_string()),
785        ]
786        .into_iter()
787        .collect();
788
789        let previous_summary = Summary {
790            operation: Operation::Overwrite,
791            additional_properties: prev_props,
792        };
793
794        let summary = Summary {
795            operation: Operation::Overwrite,
796            additional_properties: HashMap::new(),
797        };
798
799        let err = update_snapshot_summaries(summary, Some(&previous_summary), true)
800            .expect_err("malformed previous summary must produce an Err, not a panic");
801        assert!(
802            err.message().contains("truncate table summary"),
803            "expected wrapped 'Failed to truncate table summary' context, got: {}",
804            err.message()
805        );
806    }
807
808    #[test]
809    fn test_snapshot_summary_collector_build() {
810        let schema = Arc::new(
811            Schema::builder()
812                .with_fields(vec![
813                    NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
814                    NestedField::required(2, "name", Type::Primitive(PrimitiveType::String)).into(),
815                ])
816                .build()
817                .unwrap(),
818        );
819
820        let partition_spec = Arc::new(
821            PartitionSpec::builder(schema.clone())
822                .add_unbound_fields(vec![
823                    UnboundPartitionField::builder()
824                        .source_id(2)
825                        .name("year".to_string())
826                        .transform(Transform::Identity)
827                        .build(),
828                ])
829                .unwrap()
830                .with_spec_id(1)
831                .build()
832                .unwrap(),
833        );
834
835        let mut collector = SnapshotSummaryCollector::default();
836        collector.set_partition_summary_limit(10);
837
838        let file1 = DataFile {
839            content: DataContentType::Data,
840            file_path: "s3://testbucket/path/to/file1.parquet".to_string(),
841            file_format: DataFileFormat::Parquet,
842            partition: Struct::from_iter(vec![]),
843            record_count: 10,
844            file_size_in_bytes: 100,
845            column_sizes: HashMap::from([(1, 46), (2, 48), (3, 48)]),
846            value_counts: HashMap::from([(1, 10), (2, 10), (3, 10)]),
847            null_value_counts: HashMap::from([(1, 0), (2, 0), (3, 0)]),
848            nan_value_counts: HashMap::new(),
849            lower_bounds: HashMap::from([
850                (1, Datum::long(1)),
851                (2, Datum::string("a")),
852                (3, Datum::string("x")),
853            ]),
854            upper_bounds: HashMap::from([
855                (1, Datum::long(1)),
856                (2, Datum::string("a")),
857                (3, Datum::string("x")),
858            ]),
859            key_metadata: None,
860            split_offsets: Some(vec![4]),
861            equality_ids: None,
862            sort_order_id: Some(0),
863            partition_spec_id: 0,
864            first_row_id: None,
865            referenced_data_file: None,
866            content_offset: None,
867            content_size_in_bytes: None,
868        };
869
870        let file2 = DataFile {
871            content: DataContentType::Data,
872            file_path: "s3://testbucket/path/to/file2.parquet".to_string(),
873            file_format: DataFileFormat::Parquet,
874            partition: Struct::from_iter(vec![Some(Literal::string("2025"))]),
875            record_count: 20,
876            file_size_in_bytes: 200,
877            column_sizes: HashMap::from([(1, 46), (2, 48), (3, 48)]),
878            value_counts: HashMap::from([(1, 20), (2, 20), (3, 20)]),
879            null_value_counts: HashMap::from([(1, 0), (2, 0), (3, 0)]),
880            nan_value_counts: HashMap::new(),
881            lower_bounds: HashMap::from([
882                (1, Datum::long(1)),
883                (2, Datum::string("a")),
884                (3, Datum::string("x")),
885            ]),
886            upper_bounds: HashMap::from([
887                (1, Datum::long(1)),
888                (2, Datum::string("a")),
889                (3, Datum::string("x")),
890            ]),
891            key_metadata: None,
892            split_offsets: Some(vec![4]),
893            equality_ids: None,
894            sort_order_id: Some(0),
895            partition_spec_id: 0,
896            first_row_id: None,
897            referenced_data_file: None,
898            content_offset: None,
899            content_size_in_bytes: None,
900        };
901
902        collector.add_file(&file1, schema.clone(), partition_spec.clone());
903        collector.add_file(&file2, schema.clone(), partition_spec.clone());
904
905        collector.remove_file(&file1, schema.clone(), partition_spec.clone());
906
907        let props = collector.build();
908
909        assert_eq!(props.get(ADDED_FILE_SIZE).unwrap(), "300");
910        assert_eq!(props.get(REMOVED_FILE_SIZE).unwrap(), "100");
911
912        let partition_key = format!("{}{}", CHANGED_PARTITION_PREFIX, "year=2025");
913
914        assert!(props.contains_key(&partition_key));
915
916        let partition_summary = props.get(&partition_key).unwrap();
917        assert!(partition_summary.contains(&format!("{ADDED_FILE_SIZE}=200")));
918        assert!(partition_summary.contains(&format!("{ADDED_DATA_FILES}=1")));
919        assert!(partition_summary.contains(&format!("{ADDED_RECORDS}=20")));
920    }
921
922    #[test]
923    fn test_snapshot_summary_collector_add_manifest() {
924        let mut collector = SnapshotSummaryCollector::default();
925        collector.set_partition_summary_limit(10);
926
927        let manifest = ManifestFile {
928            manifest_path: "file://dummy.manifest".to_string(),
929            manifest_length: 0,
930            partition_spec_id: 0,
931            content: ManifestContentType::Data,
932            sequence_number: 0,
933            min_sequence_number: 0,
934            added_snapshot_id: 0,
935            added_files_count: Some(3),
936            existing_files_count: Some(0),
937            deleted_files_count: Some(1),
938            added_rows_count: Some(100),
939            existing_rows_count: Some(0),
940            deleted_rows_count: Some(50),
941            partitions: Some(Vec::new()),
942            key_metadata: None,
943            first_row_id: None,
944        };
945
946        collector
947            .partition_metrics
948            .insert("dummy".to_string(), UpdateMetrics::default());
949        collector.add_manifest(&manifest);
950
951        let props = collector.build();
952        assert_eq!(props.get(ADDED_DATA_FILES).unwrap(), "3");
953        assert_eq!(props.get(DELETED_DATA_FILES).unwrap(), "1");
954        assert_eq!(props.get(ADDED_RECORDS).unwrap(), "100");
955        assert_eq!(props.get(DELETED_RECORDS).unwrap(), "50");
956    }
957
958    #[test]
959    fn test_snapshot_summary_collector_merge() {
960        let schema = Arc::new(
961            Schema::builder()
962                .with_fields(vec![
963                    NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
964                    NestedField::required(2, "name", Type::Primitive(PrimitiveType::String)).into(),
965                ])
966                .build()
967                .unwrap(),
968        );
969
970        let partition_spec = Arc::new(
971            PartitionSpec::builder(schema.clone())
972                .add_unbound_fields(vec![
973                    UnboundPartitionField::builder()
974                        .source_id(2)
975                        .name("year".to_string())
976                        .transform(Transform::Identity)
977                        .build(),
978                ])
979                .unwrap()
980                .with_spec_id(1)
981                .build()
982                .unwrap(),
983        );
984
985        let mut summary_one = SnapshotSummaryCollector::default();
986        let mut summary_two = SnapshotSummaryCollector::default();
987
988        summary_one.add_file(
989            &DataFile {
990                content: DataContentType::Data,
991                file_path: "test.parquet".into(),
992                file_format: DataFileFormat::Parquet,
993                partition: Struct::from_iter(vec![]),
994                record_count: 10,
995                file_size_in_bytes: 100,
996                column_sizes: HashMap::new(),
997                value_counts: HashMap::new(),
998                null_value_counts: HashMap::new(),
999                nan_value_counts: HashMap::new(),
1000                lower_bounds: HashMap::new(),
1001                upper_bounds: HashMap::new(),
1002                key_metadata: None,
1003                split_offsets: None,
1004                equality_ids: None,
1005                sort_order_id: None,
1006                partition_spec_id: 0,
1007                first_row_id: None,
1008                referenced_data_file: None,
1009                content_offset: None,
1010                content_size_in_bytes: None,
1011            },
1012            schema.clone(),
1013            partition_spec.clone(),
1014        );
1015
1016        summary_two.add_file(
1017            &DataFile {
1018                content: DataContentType::Data,
1019                file_path: "test.parquet".into(),
1020                file_format: DataFileFormat::Parquet,
1021                partition: Struct::from_iter(vec![]),
1022                record_count: 20,
1023                file_size_in_bytes: 200,
1024                column_sizes: HashMap::new(),
1025                value_counts: HashMap::new(),
1026                null_value_counts: HashMap::new(),
1027                nan_value_counts: HashMap::new(),
1028                lower_bounds: HashMap::new(),
1029                upper_bounds: HashMap::new(),
1030                key_metadata: None,
1031                split_offsets: None,
1032                equality_ids: None,
1033                sort_order_id: None,
1034                partition_spec_id: 0,
1035                first_row_id: None,
1036                referenced_data_file: None,
1037                content_offset: None,
1038                content_size_in_bytes: None,
1039            },
1040            schema.clone(),
1041            partition_spec.clone(),
1042        );
1043
1044        summary_one.merge(summary_two);
1045        let props = summary_one.build();
1046        assert_eq!(props.get(ADDED_DATA_FILES).unwrap(), "2");
1047        assert_eq!(props.get(ADDED_RECORDS).unwrap(), "30");
1048
1049        let mut summary_three = SnapshotSummaryCollector::default();
1050        let mut summary_four = SnapshotSummaryCollector::default();
1051
1052        summary_three.add_manifest(&ManifestFile {
1053            manifest_path: "test.manifest".to_string(),
1054            manifest_length: 0,
1055            partition_spec_id: 0,
1056            content: ManifestContentType::Data,
1057            sequence_number: 0,
1058            min_sequence_number: 0,
1059            added_snapshot_id: 0,
1060            added_files_count: Some(1),
1061            existing_files_count: Some(0),
1062            deleted_files_count: Some(0),
1063            added_rows_count: Some(5),
1064            existing_rows_count: Some(0),
1065            deleted_rows_count: Some(0),
1066            partitions: Some(Vec::new()),
1067            key_metadata: None,
1068            first_row_id: None,
1069        });
1070
1071        summary_four.add_file(
1072            &DataFile {
1073                content: DataContentType::Data,
1074                file_path: "test.parquet".into(),
1075                file_format: DataFileFormat::Parquet,
1076                partition: Struct::from_iter(vec![]),
1077                record_count: 1,
1078                file_size_in_bytes: 10,
1079                column_sizes: HashMap::new(),
1080                value_counts: HashMap::new(),
1081                null_value_counts: HashMap::new(),
1082                nan_value_counts: HashMap::new(),
1083                lower_bounds: HashMap::new(),
1084                upper_bounds: HashMap::new(),
1085                key_metadata: None,
1086                split_offsets: None,
1087                equality_ids: None,
1088                sort_order_id: None,
1089                partition_spec_id: 0,
1090                first_row_id: None,
1091                referenced_data_file: None,
1092                content_offset: None,
1093                content_size_in_bytes: None,
1094            },
1095            schema.clone(),
1096            partition_spec.clone(),
1097        );
1098
1099        summary_three.merge(summary_four);
1100        let props = summary_three.build();
1101
1102        assert_eq!(props.get(ADDED_DATA_FILES).unwrap(), "2");
1103        assert_eq!(props.get(ADDED_RECORDS).unwrap(), "6");
1104        assert!(
1105            props
1106                .iter()
1107                .all(|(k, _)| !k.starts_with(CHANGED_PARTITION_PREFIX))
1108        );
1109    }
1110
1111    #[test]
1112    fn test_update_totals_skipped_when_previous_summary_missing_totals() {
1113        let prev_props: HashMap<String, String> = [(TOTAL_DATA_FILES, "8")]
1114            .into_iter()
1115            .map(|(k, v)| (k.to_string(), v.to_string()))
1116            .collect();
1117
1118        let previous_summary = Summary {
1119            operation: Operation::Overwrite,
1120            additional_properties: prev_props,
1121        };
1122
1123        let new_props: HashMap<String, String> = [
1124            (ADDED_DATA_FILES, "4"),
1125            (ADDED_DELETE_FILES, "2"),
1126            (ADDED_RECORDS, "40"),
1127            (ADDED_FILE_SIZE, "400"),
1128            (ADDED_POSITION_DELETES, "5"),
1129            (ADDED_EQUALITY_DELETES, "3"),
1130        ]
1131        .into_iter()
1132        .map(|(k, v)| (k.to_string(), v.to_string()))
1133        .collect();
1134
1135        let summary = Summary {
1136            operation: Operation::Append,
1137            additional_properties: new_props,
1138        };
1139
1140        let updated = update_snapshot_summaries(summary, Some(&previous_summary), false).unwrap();
1141        let props = &updated.additional_properties;
1142
1143        assert_eq!(props.get(TOTAL_DATA_FILES).unwrap(), "12");
1144
1145        for total_field in [
1146            TOTAL_DELETE_FILES,
1147            TOTAL_RECORDS,
1148            TOTAL_FILE_SIZE,
1149            TOTAL_POSITION_DELETES,
1150            TOTAL_EQUALITY_DELETES,
1151        ] {
1152            assert!(
1153                !props.contains_key(total_field),
1154                "{total_field} should not be set when previous summary lacks it",
1155            );
1156        }
1157    }
1158
1159    #[test]
1160    fn test_update_totals_computed_when_no_previous_summary() {
1161        let new_props: HashMap<String, String> = [
1162            (ADDED_DATA_FILES, "4"),
1163            (ADDED_RECORDS, "40"),
1164            (ADDED_FILE_SIZE, "400"),
1165        ]
1166        .into_iter()
1167        .map(|(k, v)| (k.to_string(), v.to_string()))
1168        .collect();
1169
1170        let summary = Summary {
1171            operation: Operation::Append,
1172            additional_properties: new_props,
1173        };
1174
1175        let updated = update_snapshot_summaries(summary, None, false).unwrap();
1176        let props = &updated.additional_properties;
1177
1178        assert_eq!(props.get(TOTAL_DATA_FILES).unwrap(), "4");
1179        assert_eq!(props.get(TOTAL_RECORDS).unwrap(), "40");
1180        assert_eq!(props.get(TOTAL_FILE_SIZE).unwrap(), "400");
1181    }
1182
1183    #[test]
1184    fn test_update_totals_with_removes_only() {
1185        let prev_props: HashMap<String, String> = [
1186            (TOTAL_DATA_FILES, "10"),
1187            (TOTAL_DELETE_FILES, "5"),
1188            (TOTAL_RECORDS, "100"),
1189            (TOTAL_FILE_SIZE, "1000"),
1190            (TOTAL_POSITION_DELETES, "3"),
1191            (TOTAL_EQUALITY_DELETES, "2"),
1192        ]
1193        .into_iter()
1194        .map(|(k, v)| (k.to_string(), v.to_string()))
1195        .collect();
1196
1197        let previous_summary = Summary {
1198            operation: Operation::Overwrite,
1199            additional_properties: prev_props,
1200        };
1201
1202        let new_props: HashMap<String, String> = [
1203            (DELETED_DATA_FILES, "2"),
1204            (REMOVED_DELETE_FILES, "1"),
1205            (DELETED_RECORDS, "20"),
1206            (REMOVED_FILE_SIZE, "200"),
1207            (REMOVED_POSITION_DELETES, "1"),
1208            (REMOVED_EQUALITY_DELETES, "1"),
1209        ]
1210        .into_iter()
1211        .map(|(k, v)| (k.to_string(), v.to_string()))
1212        .collect();
1213
1214        let summary = Summary {
1215            operation: Operation::Delete,
1216            additional_properties: new_props,
1217        };
1218
1219        let updated = update_snapshot_summaries(summary, Some(&previous_summary), false).unwrap();
1220        let props = &updated.additional_properties;
1221
1222        assert_eq!(props.get(TOTAL_DATA_FILES).unwrap(), "8");
1223        assert_eq!(props.get(TOTAL_DELETE_FILES).unwrap(), "4");
1224        assert_eq!(props.get(TOTAL_RECORDS).unwrap(), "80");
1225        assert_eq!(props.get(TOTAL_FILE_SIZE).unwrap(), "800");
1226        assert_eq!(props.get(TOTAL_POSITION_DELETES).unwrap(), "2");
1227        assert_eq!(props.get(TOTAL_EQUALITY_DELETES).unwrap(), "1");
1228    }
1229}