Skip to main content

iceberg/spec/
snapshot_summary.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::collections::HashMap;
19
20use itertools::Itertools;
21
22use super::{DataContentType, DataFile, PartitionSpecRef};
23use crate::spec::{ManifestContentType, ManifestFile, Operation, SchemaRef, Summary};
24use crate::{Error, ErrorKind, Result};
25
26const ADDED_DATA_FILES: &str = "added-data-files";
27const ADDED_DELETE_FILES: &str = "added-delete-files";
28const ADDED_EQUALITY_DELETES: &str = "added-equality-deletes";
29const ADDED_FILE_SIZE: &str = "added-files-size";
30const ADDED_POSITION_DELETES: &str = "added-position-deletes";
31const ADDED_POSITION_DELETE_FILES: &str = "added-position-delete-files";
32const ADDED_RECORDS: &str = "added-records";
33const DELETED_DATA_FILES: &str = "deleted-data-files";
34const DELETED_RECORDS: &str = "deleted-records";
35const ADDED_EQUALITY_DELETE_FILES: &str = "added-equality-delete-files";
36const REMOVED_DELETE_FILES: &str = "removed-delete-files";
37const REMOVED_EQUALITY_DELETES: &str = "removed-equality-deletes";
38const REMOVED_EQUALITY_DELETE_FILES: &str = "removed-equality-delete-files";
39const REMOVED_FILE_SIZE: &str = "removed-files-size";
40const REMOVED_POSITION_DELETES: &str = "removed-position-deletes";
41const REMOVED_POSITION_DELETE_FILES: &str = "removed-position-delete-files";
42const TOTAL_EQUALITY_DELETES: &str = "total-equality-deletes";
43const TOTAL_POSITION_DELETES: &str = "total-position-deletes";
44const TOTAL_DATA_FILES: &str = "total-data-files";
45const TOTAL_DELETE_FILES: &str = "total-delete-files";
46const TOTAL_RECORDS: &str = "total-records";
47const TOTAL_FILE_SIZE: &str = "total-files-size";
48const CHANGED_PARTITION_COUNT_PROP: &str = "changed-partition-count";
49const CHANGED_PARTITION_PREFIX: &str = "partitions.";
50
51/// `SnapshotSummaryCollector` collects and aggregates snapshot update metrics.
52/// It gathers metrics about added or removed data files and manifests, and tracks
53/// partition-specific updates.
54#[derive(Default)]
55pub struct SnapshotSummaryCollector {
56    metrics: UpdateMetrics,
57    partition_metrics: HashMap<String, UpdateMetrics>,
58    max_changed_partitions_for_summaries: u64,
59    properties: HashMap<String, String>,
60    trust_partition_metrics: bool,
61}
62
63impl SnapshotSummaryCollector {
64    /// Set properties for snapshot summary
65    pub fn set(&mut self, key: &str, value: &str) {
66        self.properties.insert(key.to_string(), value.to_string());
67    }
68
69    /// Sets the limit for including partition summaries. Summaries are not
70    /// included if the number of partitions is exceeded.
71    pub fn set_partition_summary_limit(&mut self, limit: u64) {
72        self.max_changed_partitions_for_summaries = limit;
73    }
74
75    /// Adds a data file to the summary collector
76    pub fn add_file(
77        &mut self,
78        data_file: &DataFile,
79        schema: SchemaRef,
80        partition_spec: PartitionSpecRef,
81    ) {
82        self.metrics.add_file(data_file);
83        if !data_file.partition.fields().is_empty() {
84            self.update_partition_metrics(schema, partition_spec, data_file, true);
85        }
86    }
87
88    /// Removes a data file from the summary collector
89    pub fn remove_file(
90        &mut self,
91        data_file: &DataFile,
92        schema: SchemaRef,
93        partition_spec: PartitionSpecRef,
94    ) {
95        self.metrics.remove_file(data_file);
96        if !data_file.partition.fields().is_empty() {
97            self.update_partition_metrics(schema, partition_spec, data_file, false);
98        }
99    }
100
101    /// Adds a manifest to the summary collector
102    pub fn add_manifest(&mut self, manifest: &ManifestFile) {
103        self.trust_partition_metrics = false;
104        self.partition_metrics.clear();
105        self.metrics.add_manifest(manifest);
106    }
107
108    /// Updates partition-specific metrics for a data file.
109    pub fn update_partition_metrics(
110        &mut self,
111        schema: SchemaRef,
112        partition_spec: PartitionSpecRef,
113        data_file: &DataFile,
114        is_add_file: bool,
115    ) {
116        let partition_path = partition_spec.partition_to_path(&data_file.partition, schema);
117        let metrics = self.partition_metrics.entry(partition_path).or_default();
118
119        if is_add_file {
120            metrics.add_file(data_file);
121        } else {
122            metrics.remove_file(data_file);
123        }
124    }
125
126    /// Merges another `SnapshotSummaryCollector` into the current one
127    pub fn merge(&mut self, summary: SnapshotSummaryCollector) {
128        self.metrics.merge(&summary.metrics);
129        self.properties.extend(summary.properties);
130
131        if self.trust_partition_metrics && summary.trust_partition_metrics {
132            for (partition, partition_metric) in summary.partition_metrics.iter() {
133                self.partition_metrics
134                    .entry(partition.to_string())
135                    .or_default()
136                    .merge(partition_metric);
137            }
138        } else {
139            self.partition_metrics.clear();
140            self.trust_partition_metrics = false;
141        }
142    }
143
144    /// Builds final map of summaries
145    pub fn build(&self) -> HashMap<String, String> {
146        let mut properties = self.metrics.to_map();
147        let changed_partitions_count = self.partition_metrics.len() as u64;
148        set_if_positive(
149            &mut properties,
150            changed_partitions_count,
151            CHANGED_PARTITION_COUNT_PROP,
152        );
153
154        if changed_partitions_count <= self.max_changed_partitions_for_summaries {
155            for (partition_path, update_metrics_partition) in &self.partition_metrics {
156                let property_key = format!("{CHANGED_PARTITION_PREFIX}{partition_path}");
157                let partition_summary = update_metrics_partition
158                    .to_map()
159                    .into_iter()
160                    .map(|(property, value)| format!("{property}={value}"))
161                    .join(",");
162
163                if !partition_summary.is_empty() {
164                    properties.insert(property_key, partition_summary);
165                }
166            }
167        }
168        properties
169    }
170}
171
172#[derive(Debug, Default)]
173struct UpdateMetrics {
174    added_file_size: u64,
175    removed_file_size: u64,
176    added_data_files: u32,
177    removed_data_files: u32,
178    added_eq_delete_files: u64,
179    removed_eq_delete_files: u64,
180    added_pos_delete_files: u64,
181    removed_pos_delete_files: u64,
182    added_delete_files: u32,
183    removed_delete_files: u32,
184    added_records: u64,
185    deleted_records: u64,
186    added_pos_deletes: u64,
187    removed_pos_deletes: u64,
188    added_eq_deletes: u64,
189    removed_eq_deletes: u64,
190}
191
192impl UpdateMetrics {
193    fn add_file(&mut self, data_file: &DataFile) {
194        self.added_file_size += data_file.file_size_in_bytes;
195        match data_file.content_type() {
196            DataContentType::Data => {
197                self.added_data_files += 1;
198                self.added_records += data_file.record_count;
199            }
200            DataContentType::PositionDeletes => {
201                self.added_delete_files += 1;
202                self.added_pos_delete_files += 1;
203                self.added_pos_deletes += data_file.record_count;
204            }
205            DataContentType::EqualityDeletes => {
206                self.added_delete_files += 1;
207                self.added_eq_delete_files += 1;
208                self.added_eq_deletes += data_file.record_count;
209            }
210        }
211    }
212
213    fn remove_file(&mut self, data_file: &DataFile) {
214        self.removed_file_size += data_file.file_size_in_bytes;
215        match data_file.content_type() {
216            DataContentType::Data => {
217                self.removed_data_files += 1;
218                self.deleted_records += data_file.record_count;
219            }
220            DataContentType::PositionDeletes => {
221                self.removed_delete_files += 1;
222                self.removed_pos_delete_files += 1;
223                self.removed_pos_deletes += data_file.record_count;
224            }
225            DataContentType::EqualityDeletes => {
226                self.removed_delete_files += 1;
227                self.removed_eq_delete_files += 1;
228                self.removed_eq_deletes += data_file.record_count;
229            }
230        }
231    }
232
233    fn add_manifest(&mut self, manifest: &ManifestFile) {
234        match manifest.content {
235            ManifestContentType::Data => {
236                self.added_data_files += manifest.added_files_count.unwrap_or(0);
237                self.added_records += manifest.added_rows_count.unwrap_or(0);
238                self.removed_data_files += manifest.deleted_files_count.unwrap_or(0);
239                self.deleted_records += manifest.deleted_rows_count.unwrap_or(0);
240            }
241            ManifestContentType::Deletes => {
242                self.added_delete_files += manifest.added_files_count.unwrap_or(0);
243                self.removed_delete_files += manifest.deleted_files_count.unwrap_or(0);
244            }
245        }
246    }
247
248    fn to_map(&self) -> HashMap<String, String> {
249        let mut properties = HashMap::new();
250        set_if_positive(&mut properties, self.added_file_size, ADDED_FILE_SIZE);
251        set_if_positive(&mut properties, self.removed_file_size, REMOVED_FILE_SIZE);
252        set_if_positive(&mut properties, self.added_data_files, ADDED_DATA_FILES);
253        set_if_positive(&mut properties, self.removed_data_files, DELETED_DATA_FILES);
254        set_if_positive(
255            &mut properties,
256            self.added_eq_delete_files,
257            ADDED_EQUALITY_DELETE_FILES,
258        );
259        set_if_positive(
260            &mut properties,
261            self.removed_eq_delete_files,
262            REMOVED_EQUALITY_DELETE_FILES,
263        );
264        set_if_positive(
265            &mut properties,
266            self.added_pos_delete_files,
267            ADDED_POSITION_DELETE_FILES,
268        );
269        set_if_positive(
270            &mut properties,
271            self.removed_pos_delete_files,
272            REMOVED_POSITION_DELETE_FILES,
273        );
274        set_if_positive(&mut properties, self.added_delete_files, ADDED_DELETE_FILES);
275        set_if_positive(
276            &mut properties,
277            self.removed_delete_files,
278            REMOVED_DELETE_FILES,
279        );
280        set_if_positive(&mut properties, self.added_records, ADDED_RECORDS);
281        set_if_positive(&mut properties, self.deleted_records, DELETED_RECORDS);
282        set_if_positive(
283            &mut properties,
284            self.added_pos_deletes,
285            ADDED_POSITION_DELETES,
286        );
287        set_if_positive(
288            &mut properties,
289            self.removed_pos_deletes,
290            REMOVED_POSITION_DELETES,
291        );
292        set_if_positive(
293            &mut properties,
294            self.added_eq_deletes,
295            ADDED_EQUALITY_DELETES,
296        );
297        set_if_positive(
298            &mut properties,
299            self.removed_eq_deletes,
300            REMOVED_EQUALITY_DELETES,
301        );
302        properties
303    }
304
305    fn merge(&mut self, other: &UpdateMetrics) {
306        self.added_file_size += other.added_file_size;
307        self.removed_file_size += other.removed_file_size;
308        self.added_data_files += other.added_data_files;
309        self.removed_data_files += other.removed_data_files;
310        self.added_eq_delete_files += other.added_eq_delete_files;
311        self.removed_eq_delete_files += other.removed_eq_delete_files;
312        self.added_pos_delete_files += other.added_pos_delete_files;
313        self.removed_pos_delete_files += other.removed_pos_delete_files;
314        self.added_delete_files += other.added_delete_files;
315        self.removed_delete_files += other.removed_delete_files;
316        self.added_records += other.added_records;
317        self.deleted_records += other.deleted_records;
318        self.added_pos_deletes += other.added_pos_deletes;
319        self.removed_pos_deletes += other.removed_pos_deletes;
320        self.added_eq_deletes += other.added_eq_deletes;
321        self.removed_eq_deletes += other.removed_eq_deletes;
322    }
323}
324
325fn set_if_positive<T>(properties: &mut HashMap<String, String>, value: T, property_name: &str)
326where T: PartialOrd + Default + ToString {
327    if value > T::default() {
328        properties.insert(property_name.to_string(), value.to_string());
329    }
330}
331
332pub(crate) fn update_snapshot_summaries(
333    summary: Summary,
334    previous_summary: Option<&Summary>,
335    truncate_full_table: bool,
336) -> Result<Summary> {
337    // Validate that the operation is supported
338    if summary.operation != Operation::Append
339        && summary.operation != Operation::Overwrite
340        && summary.operation != Operation::Delete
341    {
342        return Err(Error::new(
343            ErrorKind::DataInvalid,
344            "Operation is not supported.",
345        ));
346    }
347
348    let mut summary = match previous_summary {
349        Some(prev_summary) if truncate_full_table && summary.operation == Operation::Overwrite => {
350            truncate_table_summary(summary, prev_summary).map_err(|err| {
351                Error::new(ErrorKind::Unexpected, "Failed to truncate table summary.")
352                    .with_source(err)
353            })?
354        }
355        _ => summary,
356    };
357
358    update_totals(
359        &mut summary,
360        previous_summary,
361        TOTAL_DATA_FILES,
362        ADDED_DATA_FILES,
363        DELETED_DATA_FILES,
364    );
365
366    update_totals(
367        &mut summary,
368        previous_summary,
369        TOTAL_DELETE_FILES,
370        ADDED_DELETE_FILES,
371        REMOVED_DELETE_FILES,
372    );
373
374    update_totals(
375        &mut summary,
376        previous_summary,
377        TOTAL_RECORDS,
378        ADDED_RECORDS,
379        DELETED_RECORDS,
380    );
381
382    update_totals(
383        &mut summary,
384        previous_summary,
385        TOTAL_FILE_SIZE,
386        ADDED_FILE_SIZE,
387        REMOVED_FILE_SIZE,
388    );
389
390    update_totals(
391        &mut summary,
392        previous_summary,
393        TOTAL_POSITION_DELETES,
394        ADDED_POSITION_DELETES,
395        REMOVED_POSITION_DELETES,
396    );
397
398    update_totals(
399        &mut summary,
400        previous_summary,
401        TOTAL_EQUALITY_DELETES,
402        ADDED_EQUALITY_DELETES,
403        REMOVED_EQUALITY_DELETES,
404    );
405    Ok(summary)
406}
407
408fn get_prop(previous_summary: &Summary, prop: &str) -> Result<u64> {
409    let value_str = previous_summary
410        .additional_properties
411        .get(prop)
412        .map(String::as_str)
413        .unwrap_or("0");
414    value_str.parse::<u64>().map_err(|err| {
415        Error::new(
416            ErrorKind::Unexpected,
417            format!("Failed to parse summary property '{prop}' value '{value_str}' as u64."),
418        )
419        .with_source(err)
420    })
421}
422
423fn truncate_table_summary(mut summary: Summary, previous_summary: &Summary) -> Result<Summary> {
424    for prop in [
425        TOTAL_DATA_FILES,
426        TOTAL_DELETE_FILES,
427        TOTAL_RECORDS,
428        TOTAL_FILE_SIZE,
429        TOTAL_POSITION_DELETES,
430        TOTAL_EQUALITY_DELETES,
431    ] {
432        summary
433            .additional_properties
434            .insert(prop.to_string(), "0".to_string());
435    }
436
437    let value = get_prop(previous_summary, TOTAL_DATA_FILES)?;
438    if value != 0 {
439        summary
440            .additional_properties
441            .insert(DELETED_DATA_FILES.to_string(), value.to_string());
442    }
443    let value = get_prop(previous_summary, TOTAL_DELETE_FILES)?;
444    if value != 0 {
445        summary
446            .additional_properties
447            .insert(REMOVED_DELETE_FILES.to_string(), value.to_string());
448    }
449    let value = get_prop(previous_summary, TOTAL_RECORDS)?;
450    if value != 0 {
451        summary
452            .additional_properties
453            .insert(DELETED_RECORDS.to_string(), value.to_string());
454    }
455    let value = get_prop(previous_summary, TOTAL_FILE_SIZE)?;
456    if value != 0 {
457        summary
458            .additional_properties
459            .insert(REMOVED_FILE_SIZE.to_string(), value.to_string());
460    }
461
462    let value = get_prop(previous_summary, TOTAL_POSITION_DELETES)?;
463    if value != 0 {
464        summary
465            .additional_properties
466            .insert(REMOVED_POSITION_DELETES.to_string(), value.to_string());
467    }
468
469    let value = get_prop(previous_summary, TOTAL_EQUALITY_DELETES)?;
470    if value != 0 {
471        summary
472            .additional_properties
473            .insert(REMOVED_EQUALITY_DELETES.to_string(), value.to_string());
474    }
475
476    Ok(summary)
477}
478
479fn update_totals(
480    summary: &mut Summary,
481    previous_summary: Option<&Summary>,
482    total_property: &str,
483    added_property: &str,
484    removed_property: &str,
485) {
486    let previous_total = match previous_summary {
487        None => 0,
488        Some(prev_summary) => match prev_summary.additional_properties.get(total_property) {
489            Some(value_str) => match value_str.parse::<u64>() {
490                Ok(v) => v,
491                Err(parse_err) => {
492                    tracing::warn!(
493                        "Property '{total_property}' could not be parsed in the previous snapshot summary: {parse_err}. \
494                         Skipping total computation.",
495                    );
496                    return;
497                }
498            },
499            None => {
500                tracing::debug!(
501                    "Property '{total_property}' was not set in the previous snapshot summary. \
502                     Skipping total computation."
503                );
504                return;
505            }
506        },
507    };
508
509    // Parse the added/removed deltas, tolerating an unparsable value by skipping
510    // the total entirely rather than panicking. Computed metrics always overwrite
511    // user-supplied summary properties (see `SnapshotProducer::summary`), so a bad
512    // value should only ever come from a previous snapshot's summary; matching
513    // iceberg-java's `updateTotal`, we ignore it instead of failing the commit.
514    let parse_delta = |property: &str| -> Option<u64> {
515        match summary.additional_properties.get(property) {
516            None => Some(0),
517            Some(value) => match value.parse::<u64>() {
518                Ok(v) => Some(v),
519                Err(parse_err) => {
520                    tracing::warn!(
521                        "Property '{property}' could not be parsed when computing '{total_property}': {parse_err}. \
522                         Skipping total computation.",
523                    );
524                    None
525                }
526            },
527        }
528    };
529
530    let (Some(added), Some(removed)) = (parse_delta(added_property), parse_delta(removed_property))
531    else {
532        return;
533    };
534
535    let new_total = previous_total + added - removed;
536    summary
537        .additional_properties
538        .insert(total_property.to_string(), new_total.to_string());
539}
540
541#[cfg(test)]
542mod tests {
543    use std::collections::HashMap;
544    use std::sync::Arc;
545
546    use super::*;
547    use crate::spec::{
548        DataFileFormat, Datum, Literal, NestedField, PartitionSpec, PrimitiveType, Schema, Struct,
549        Transform, Type, UnboundPartitionField,
550    };
551
552    #[test]
553    fn test_update_snapshot_summaries_append() {
554        let prev_props: HashMap<String, String> = [
555            (TOTAL_DATA_FILES.to_string(), "10".to_string()),
556            (TOTAL_DELETE_FILES.to_string(), "5".to_string()),
557            (TOTAL_RECORDS.to_string(), "100".to_string()),
558            (TOTAL_FILE_SIZE.to_string(), "1000".to_string()),
559            (TOTAL_POSITION_DELETES.to_string(), "3".to_string()),
560            (TOTAL_EQUALITY_DELETES.to_string(), "2".to_string()),
561        ]
562        .into_iter()
563        .collect();
564
565        let previous_summary = Summary {
566            operation: Operation::Append,
567            additional_properties: prev_props,
568        };
569
570        let new_props: HashMap<String, String> = [
571            (ADDED_DATA_FILES.to_string(), "4".to_string()),
572            (DELETED_DATA_FILES.to_string(), "1".to_string()),
573            (ADDED_DELETE_FILES.to_string(), "2".to_string()),
574            (REMOVED_DELETE_FILES.to_string(), "1".to_string()),
575            (ADDED_RECORDS.to_string(), "40".to_string()),
576            (DELETED_RECORDS.to_string(), "10".to_string()),
577            (ADDED_FILE_SIZE.to_string(), "400".to_string()),
578            (REMOVED_FILE_SIZE.to_string(), "100".to_string()),
579            (ADDED_POSITION_DELETES.to_string(), "5".to_string()),
580            (REMOVED_POSITION_DELETES.to_string(), "2".to_string()),
581            (ADDED_EQUALITY_DELETES.to_string(), "3".to_string()),
582            (REMOVED_EQUALITY_DELETES.to_string(), "1".to_string()),
583        ]
584        .into_iter()
585        .collect();
586
587        let summary = Summary {
588            operation: Operation::Append,
589            additional_properties: new_props,
590        };
591
592        let updated = update_snapshot_summaries(summary, Some(&previous_summary), false).unwrap();
593
594        assert_eq!(
595            updated.additional_properties.get(TOTAL_DATA_FILES).unwrap(),
596            "13"
597        );
598        assert_eq!(
599            updated
600                .additional_properties
601                .get(TOTAL_DELETE_FILES)
602                .unwrap(),
603            "6"
604        );
605        assert_eq!(
606            updated.additional_properties.get(TOTAL_RECORDS).unwrap(),
607            "130"
608        );
609        assert_eq!(
610            updated.additional_properties.get(TOTAL_FILE_SIZE).unwrap(),
611            "1300"
612        );
613        assert_eq!(
614            updated
615                .additional_properties
616                .get(TOTAL_POSITION_DELETES)
617                .unwrap(),
618            "6"
619        );
620        assert_eq!(
621            updated
622                .additional_properties
623                .get(TOTAL_EQUALITY_DELETES)
624                .unwrap(),
625            "4"
626        );
627    }
628
629    #[test]
630    fn test_truncate_table_summary() {
631        let prev_props: HashMap<String, String> = [
632            (TOTAL_DATA_FILES.to_string(), "10".to_string()),
633            (TOTAL_DELETE_FILES.to_string(), "5".to_string()),
634            (TOTAL_RECORDS.to_string(), "100".to_string()),
635            (TOTAL_FILE_SIZE.to_string(), "1000".to_string()),
636            (TOTAL_POSITION_DELETES.to_string(), "3".to_string()),
637            (TOTAL_EQUALITY_DELETES.to_string(), "2".to_string()),
638        ]
639        .into_iter()
640        .collect();
641
642        let previous_summary = Summary {
643            operation: Operation::Overwrite,
644            additional_properties: prev_props,
645        };
646
647        let mut new_props = HashMap::new();
648        new_props.insert("dummy".to_string(), "value".to_string());
649        let summary = Summary {
650            operation: Operation::Overwrite,
651            additional_properties: new_props,
652        };
653
654        let truncated = truncate_table_summary(summary, &previous_summary).unwrap();
655
656        assert_eq!(
657            truncated
658                .additional_properties
659                .get(TOTAL_DATA_FILES)
660                .unwrap(),
661            "0"
662        );
663        assert_eq!(
664            truncated
665                .additional_properties
666                .get(TOTAL_DELETE_FILES)
667                .unwrap(),
668            "0"
669        );
670        assert_eq!(
671            truncated.additional_properties.get(TOTAL_RECORDS).unwrap(),
672            "0"
673        );
674        assert_eq!(
675            truncated
676                .additional_properties
677                .get(TOTAL_FILE_SIZE)
678                .unwrap(),
679            "0"
680        );
681        assert_eq!(
682            truncated
683                .additional_properties
684                .get(TOTAL_POSITION_DELETES)
685                .unwrap(),
686            "0"
687        );
688        assert_eq!(
689            truncated
690                .additional_properties
691                .get(TOTAL_EQUALITY_DELETES)
692                .unwrap(),
693            "0"
694        );
695
696        assert_eq!(
697            truncated
698                .additional_properties
699                .get(DELETED_DATA_FILES)
700                .unwrap(),
701            "10"
702        );
703        assert_eq!(
704            truncated
705                .additional_properties
706                .get(REMOVED_DELETE_FILES)
707                .unwrap(),
708            "5"
709        );
710        assert_eq!(
711            truncated
712                .additional_properties
713                .get(DELETED_RECORDS)
714                .unwrap(),
715            "100"
716        );
717        assert_eq!(
718            truncated
719                .additional_properties
720                .get(REMOVED_FILE_SIZE)
721                .unwrap(),
722            "1000"
723        );
724        assert_eq!(
725            truncated
726                .additional_properties
727                .get(REMOVED_POSITION_DELETES)
728                .unwrap(),
729            "3"
730        );
731        assert_eq!(
732            truncated
733                .additional_properties
734                .get(REMOVED_EQUALITY_DELETES)
735                .unwrap(),
736            "2"
737        );
738    }
739
740    #[test]
741    fn test_update_snapshot_summaries_overwrite_truncate_handles_totals_above_i32_max() {
742        // A table can legitimately accumulate more than i32::MAX rows or files
743        // over its lifetime. Truncating such a table on overwrite must succeed
744        // and surface the previous totals into the deleted-* counters.
745        let big = (i32::MAX as u64 + 1).to_string(); // 2_147_483_648
746        let prev_props: HashMap<String, String> = [
747            (TOTAL_DATA_FILES.to_string(), big.clone()),
748            (TOTAL_DELETE_FILES.to_string(), "0".to_string()),
749            (TOTAL_RECORDS.to_string(), big.clone()),
750            (TOTAL_FILE_SIZE.to_string(), "0".to_string()),
751            (TOTAL_POSITION_DELETES.to_string(), "0".to_string()),
752            (TOTAL_EQUALITY_DELETES.to_string(), "0".to_string()),
753        ]
754        .into_iter()
755        .collect();
756
757        let previous_summary = Summary {
758            operation: Operation::Overwrite,
759            additional_properties: prev_props,
760        };
761
762        let summary = Summary {
763            operation: Operation::Overwrite,
764            additional_properties: HashMap::new(),
765        };
766
767        let updated = update_snapshot_summaries(summary, Some(&previous_summary), true)
768            .expect("overwrite truncation should accept totals above i32::MAX");
769        assert_eq!(
770            updated
771                .additional_properties
772                .get(DELETED_DATA_FILES)
773                .unwrap(),
774            &big
775        );
776        assert_eq!(
777            updated.additional_properties.get(DELETED_RECORDS).unwrap(),
778            &big
779        );
780    }
781
782    #[test]
783    fn test_update_snapshot_summaries_overwrite_truncate_returns_err_on_malformed_total() {
784        // Non-numeric values in the previous summary (corruption, manual edits,
785        // a foreign implementation) must surface as a recoverable Err - not
786        // crash the process.
787        let prev_props: HashMap<String, String> = [
788            (TOTAL_DATA_FILES.to_string(), "not_a_number".to_string()),
789            (TOTAL_DELETE_FILES.to_string(), "0".to_string()),
790            (TOTAL_RECORDS.to_string(), "0".to_string()),
791            (TOTAL_FILE_SIZE.to_string(), "0".to_string()),
792            (TOTAL_POSITION_DELETES.to_string(), "0".to_string()),
793            (TOTAL_EQUALITY_DELETES.to_string(), "0".to_string()),
794        ]
795        .into_iter()
796        .collect();
797
798        let previous_summary = Summary {
799            operation: Operation::Overwrite,
800            additional_properties: prev_props,
801        };
802
803        let summary = Summary {
804            operation: Operation::Overwrite,
805            additional_properties: HashMap::new(),
806        };
807
808        let err = update_snapshot_summaries(summary, Some(&previous_summary), true)
809            .expect_err("malformed previous summary must produce an Err, not a panic");
810        assert!(
811            err.message().contains("truncate table summary"),
812            "expected wrapped 'Failed to truncate table summary' context, got: {}",
813            err.message()
814        );
815    }
816
817    #[test]
818    fn test_snapshot_summary_collector_build() {
819        let schema = Arc::new(
820            Schema::builder()
821                .with_fields(vec![
822                    NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
823                    NestedField::required(2, "name", Type::Primitive(PrimitiveType::String)).into(),
824                ])
825                .build()
826                .unwrap(),
827        );
828
829        let partition_spec = Arc::new(
830            PartitionSpec::builder(schema.clone())
831                .add_unbound_fields(vec![
832                    UnboundPartitionField::builder()
833                        .source_id(2)
834                        .name("year".to_string())
835                        .transform(Transform::Identity)
836                        .build(),
837                ])
838                .unwrap()
839                .with_spec_id(1)
840                .build()
841                .unwrap(),
842        );
843
844        let mut collector = SnapshotSummaryCollector::default();
845        collector.set_partition_summary_limit(10);
846
847        let file1 = DataFile {
848            content: DataContentType::Data,
849            file_path: "s3://testbucket/path/to/file1.parquet".to_string(),
850            file_format: DataFileFormat::Parquet,
851            partition: Struct::from_iter(vec![]),
852            record_count: 10,
853            file_size_in_bytes: 100,
854            column_sizes: HashMap::from([(1, 46), (2, 48), (3, 48)]),
855            value_counts: HashMap::from([(1, 10), (2, 10), (3, 10)]),
856            null_value_counts: HashMap::from([(1, 0), (2, 0), (3, 0)]),
857            nan_value_counts: HashMap::new(),
858            lower_bounds: HashMap::from([
859                (1, Datum::long(1)),
860                (2, Datum::string("a")),
861                (3, Datum::string("x")),
862            ]),
863            upper_bounds: HashMap::from([
864                (1, Datum::long(1)),
865                (2, Datum::string("a")),
866                (3, Datum::string("x")),
867            ]),
868            key_metadata: None,
869            split_offsets: Some(vec![4]),
870            equality_ids: None,
871            sort_order_id: Some(0),
872            partition_spec_id: 0,
873            first_row_id: None,
874            referenced_data_file: None,
875            content_offset: None,
876            content_size_in_bytes: None,
877        };
878
879        let file2 = DataFile {
880            content: DataContentType::Data,
881            file_path: "s3://testbucket/path/to/file2.parquet".to_string(),
882            file_format: DataFileFormat::Parquet,
883            partition: Struct::from_iter(vec![Some(Literal::string("2025"))]),
884            record_count: 20,
885            file_size_in_bytes: 200,
886            column_sizes: HashMap::from([(1, 46), (2, 48), (3, 48)]),
887            value_counts: HashMap::from([(1, 20), (2, 20), (3, 20)]),
888            null_value_counts: HashMap::from([(1, 0), (2, 0), (3, 0)]),
889            nan_value_counts: HashMap::new(),
890            lower_bounds: HashMap::from([
891                (1, Datum::long(1)),
892                (2, Datum::string("a")),
893                (3, Datum::string("x")),
894            ]),
895            upper_bounds: HashMap::from([
896                (1, Datum::long(1)),
897                (2, Datum::string("a")),
898                (3, Datum::string("x")),
899            ]),
900            key_metadata: None,
901            split_offsets: Some(vec![4]),
902            equality_ids: None,
903            sort_order_id: Some(0),
904            partition_spec_id: 0,
905            first_row_id: None,
906            referenced_data_file: None,
907            content_offset: None,
908            content_size_in_bytes: None,
909        };
910
911        collector.add_file(&file1, schema.clone(), partition_spec.clone());
912        collector.add_file(&file2, schema.clone(), partition_spec.clone());
913
914        collector.remove_file(&file1, schema.clone(), partition_spec.clone());
915
916        let props = collector.build();
917
918        assert_eq!(props.get(ADDED_FILE_SIZE).unwrap(), "300");
919        assert_eq!(props.get(REMOVED_FILE_SIZE).unwrap(), "100");
920
921        let partition_key = format!("{}{}", CHANGED_PARTITION_PREFIX, "year=2025");
922
923        assert!(props.contains_key(&partition_key));
924
925        let partition_summary = props.get(&partition_key).unwrap();
926        assert!(partition_summary.contains(&format!("{ADDED_FILE_SIZE}=200")));
927        assert!(partition_summary.contains(&format!("{ADDED_DATA_FILES}=1")));
928        assert!(partition_summary.contains(&format!("{ADDED_RECORDS}=20")));
929    }
930
931    #[test]
932    fn test_snapshot_summary_collector_add_manifest() {
933        let mut collector = SnapshotSummaryCollector::default();
934        collector.set_partition_summary_limit(10);
935
936        let manifest = ManifestFile {
937            manifest_path: "file://dummy.manifest".to_string(),
938            manifest_length: 0,
939            partition_spec_id: 0,
940            content: ManifestContentType::Data,
941            sequence_number: 0,
942            min_sequence_number: 0,
943            added_snapshot_id: 0,
944            added_files_count: Some(3),
945            existing_files_count: Some(0),
946            deleted_files_count: Some(1),
947            added_rows_count: Some(100),
948            existing_rows_count: Some(0),
949            deleted_rows_count: Some(50),
950            partitions: Some(Vec::new()),
951            key_metadata: None,
952            first_row_id: None,
953        };
954
955        collector
956            .partition_metrics
957            .insert("dummy".to_string(), UpdateMetrics::default());
958        collector.add_manifest(&manifest);
959
960        let props = collector.build();
961        assert_eq!(props.get(ADDED_DATA_FILES).unwrap(), "3");
962        assert_eq!(props.get(DELETED_DATA_FILES).unwrap(), "1");
963        assert_eq!(props.get(ADDED_RECORDS).unwrap(), "100");
964        assert_eq!(props.get(DELETED_RECORDS).unwrap(), "50");
965    }
966
967    #[test]
968    fn test_snapshot_summary_collector_merge() {
969        let schema = Arc::new(
970            Schema::builder()
971                .with_fields(vec![
972                    NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
973                    NestedField::required(2, "name", Type::Primitive(PrimitiveType::String)).into(),
974                ])
975                .build()
976                .unwrap(),
977        );
978
979        let partition_spec = Arc::new(
980            PartitionSpec::builder(schema.clone())
981                .add_unbound_fields(vec![
982                    UnboundPartitionField::builder()
983                        .source_id(2)
984                        .name("year".to_string())
985                        .transform(Transform::Identity)
986                        .build(),
987                ])
988                .unwrap()
989                .with_spec_id(1)
990                .build()
991                .unwrap(),
992        );
993
994        let mut summary_one = SnapshotSummaryCollector::default();
995        let mut summary_two = SnapshotSummaryCollector::default();
996
997        summary_one.add_file(
998            &DataFile {
999                content: DataContentType::Data,
1000                file_path: "test.parquet".into(),
1001                file_format: DataFileFormat::Parquet,
1002                partition: Struct::from_iter(vec![]),
1003                record_count: 10,
1004                file_size_in_bytes: 100,
1005                column_sizes: HashMap::new(),
1006                value_counts: HashMap::new(),
1007                null_value_counts: HashMap::new(),
1008                nan_value_counts: HashMap::new(),
1009                lower_bounds: HashMap::new(),
1010                upper_bounds: HashMap::new(),
1011                key_metadata: None,
1012                split_offsets: None,
1013                equality_ids: None,
1014                sort_order_id: None,
1015                partition_spec_id: 0,
1016                first_row_id: None,
1017                referenced_data_file: None,
1018                content_offset: None,
1019                content_size_in_bytes: None,
1020            },
1021            schema.clone(),
1022            partition_spec.clone(),
1023        );
1024
1025        summary_two.add_file(
1026            &DataFile {
1027                content: DataContentType::Data,
1028                file_path: "test.parquet".into(),
1029                file_format: DataFileFormat::Parquet,
1030                partition: Struct::from_iter(vec![]),
1031                record_count: 20,
1032                file_size_in_bytes: 200,
1033                column_sizes: HashMap::new(),
1034                value_counts: HashMap::new(),
1035                null_value_counts: HashMap::new(),
1036                nan_value_counts: HashMap::new(),
1037                lower_bounds: HashMap::new(),
1038                upper_bounds: HashMap::new(),
1039                key_metadata: None,
1040                split_offsets: None,
1041                equality_ids: None,
1042                sort_order_id: None,
1043                partition_spec_id: 0,
1044                first_row_id: None,
1045                referenced_data_file: None,
1046                content_offset: None,
1047                content_size_in_bytes: None,
1048            },
1049            schema.clone(),
1050            partition_spec.clone(),
1051        );
1052
1053        summary_one.merge(summary_two);
1054        let props = summary_one.build();
1055        assert_eq!(props.get(ADDED_DATA_FILES).unwrap(), "2");
1056        assert_eq!(props.get(ADDED_RECORDS).unwrap(), "30");
1057
1058        let mut summary_three = SnapshotSummaryCollector::default();
1059        let mut summary_four = SnapshotSummaryCollector::default();
1060
1061        summary_three.add_manifest(&ManifestFile {
1062            manifest_path: "test.manifest".to_string(),
1063            manifest_length: 0,
1064            partition_spec_id: 0,
1065            content: ManifestContentType::Data,
1066            sequence_number: 0,
1067            min_sequence_number: 0,
1068            added_snapshot_id: 0,
1069            added_files_count: Some(1),
1070            existing_files_count: Some(0),
1071            deleted_files_count: Some(0),
1072            added_rows_count: Some(5),
1073            existing_rows_count: Some(0),
1074            deleted_rows_count: Some(0),
1075            partitions: Some(Vec::new()),
1076            key_metadata: None,
1077            first_row_id: None,
1078        });
1079
1080        summary_four.add_file(
1081            &DataFile {
1082                content: DataContentType::Data,
1083                file_path: "test.parquet".into(),
1084                file_format: DataFileFormat::Parquet,
1085                partition: Struct::from_iter(vec![]),
1086                record_count: 1,
1087                file_size_in_bytes: 10,
1088                column_sizes: HashMap::new(),
1089                value_counts: HashMap::new(),
1090                null_value_counts: HashMap::new(),
1091                nan_value_counts: HashMap::new(),
1092                lower_bounds: HashMap::new(),
1093                upper_bounds: HashMap::new(),
1094                key_metadata: None,
1095                split_offsets: None,
1096                equality_ids: None,
1097                sort_order_id: None,
1098                partition_spec_id: 0,
1099                first_row_id: None,
1100                referenced_data_file: None,
1101                content_offset: None,
1102                content_size_in_bytes: None,
1103            },
1104            schema.clone(),
1105            partition_spec.clone(),
1106        );
1107
1108        summary_three.merge(summary_four);
1109        let props = summary_three.build();
1110
1111        assert_eq!(props.get(ADDED_DATA_FILES).unwrap(), "2");
1112        assert_eq!(props.get(ADDED_RECORDS).unwrap(), "6");
1113        assert!(
1114            props
1115                .iter()
1116                .all(|(k, _)| !k.starts_with(CHANGED_PARTITION_PREFIX))
1117        );
1118    }
1119
1120    #[test]
1121    fn test_update_totals_skipped_when_previous_summary_missing_totals() {
1122        let prev_props: HashMap<String, String> = [(TOTAL_DATA_FILES, "8")]
1123            .into_iter()
1124            .map(|(k, v)| (k.to_string(), v.to_string()))
1125            .collect();
1126
1127        let previous_summary = Summary {
1128            operation: Operation::Overwrite,
1129            additional_properties: prev_props,
1130        };
1131
1132        let new_props: HashMap<String, String> = [
1133            (ADDED_DATA_FILES, "4"),
1134            (ADDED_DELETE_FILES, "2"),
1135            (ADDED_RECORDS, "40"),
1136            (ADDED_FILE_SIZE, "400"),
1137            (ADDED_POSITION_DELETES, "5"),
1138            (ADDED_EQUALITY_DELETES, "3"),
1139        ]
1140        .into_iter()
1141        .map(|(k, v)| (k.to_string(), v.to_string()))
1142        .collect();
1143
1144        let summary = Summary {
1145            operation: Operation::Append,
1146            additional_properties: new_props,
1147        };
1148
1149        let updated = update_snapshot_summaries(summary, Some(&previous_summary), false).unwrap();
1150        let props = &updated.additional_properties;
1151
1152        assert_eq!(props.get(TOTAL_DATA_FILES).unwrap(), "12");
1153
1154        for total_field in [
1155            TOTAL_DELETE_FILES,
1156            TOTAL_RECORDS,
1157            TOTAL_FILE_SIZE,
1158            TOTAL_POSITION_DELETES,
1159            TOTAL_EQUALITY_DELETES,
1160        ] {
1161            assert!(
1162                !props.contains_key(total_field),
1163                "{total_field} should not be set when previous summary lacks it",
1164            );
1165        }
1166    }
1167
1168    #[test]
1169    fn test_update_totals_tolerates_unparsable_added_value() {
1170        // A non-integer added value (which can survive in a previous snapshot's
1171        // summary) must not panic the commit. Matching iceberg-java's `updateTotal`
1172        // try/catch, the affected total is skipped while other totals still compute.
1173        let prev_props: HashMap<String, String> = [(TOTAL_DATA_FILES, "8"), (TOTAL_RECORDS, "80")]
1174            .into_iter()
1175            .map(|(k, v)| (k.to_string(), v.to_string()))
1176            .collect();
1177
1178        let previous_summary = Summary {
1179            operation: Operation::Append,
1180            additional_properties: prev_props,
1181        };
1182
1183        let new_props: HashMap<String, String> =
1184            [(ADDED_DATA_FILES, "not-a-number"), (ADDED_RECORDS, "40")]
1185                .into_iter()
1186                .map(|(k, v)| (k.to_string(), v.to_string()))
1187                .collect();
1188
1189        let summary = Summary {
1190            operation: Operation::Append,
1191            additional_properties: new_props,
1192        };
1193
1194        // Must not panic.
1195        let updated = update_snapshot_summaries(summary, Some(&previous_summary), false).unwrap();
1196        let props = &updated.additional_properties;
1197
1198        // The total whose added delta was unparsable is skipped...
1199        assert!(
1200            !props.contains_key(TOTAL_DATA_FILES),
1201            "TOTAL_DATA_FILES should be skipped when its added value is unparsable",
1202        );
1203        // ...while a sibling total with valid deltas still computes.
1204        assert_eq!(props.get(TOTAL_RECORDS).unwrap(), "120");
1205    }
1206
1207    #[test]
1208    fn test_update_totals_computed_when_no_previous_summary() {
1209        let new_props: HashMap<String, String> = [
1210            (ADDED_DATA_FILES, "4"),
1211            (ADDED_RECORDS, "40"),
1212            (ADDED_FILE_SIZE, "400"),
1213        ]
1214        .into_iter()
1215        .map(|(k, v)| (k.to_string(), v.to_string()))
1216        .collect();
1217
1218        let summary = Summary {
1219            operation: Operation::Append,
1220            additional_properties: new_props,
1221        };
1222
1223        let updated = update_snapshot_summaries(summary, None, false).unwrap();
1224        let props = &updated.additional_properties;
1225
1226        assert_eq!(props.get(TOTAL_DATA_FILES).unwrap(), "4");
1227        assert_eq!(props.get(TOTAL_RECORDS).unwrap(), "40");
1228        assert_eq!(props.get(TOTAL_FILE_SIZE).unwrap(), "400");
1229    }
1230
1231    #[test]
1232    fn test_update_totals_with_removes_only() {
1233        let prev_props: HashMap<String, String> = [
1234            (TOTAL_DATA_FILES, "10"),
1235            (TOTAL_DELETE_FILES, "5"),
1236            (TOTAL_RECORDS, "100"),
1237            (TOTAL_FILE_SIZE, "1000"),
1238            (TOTAL_POSITION_DELETES, "3"),
1239            (TOTAL_EQUALITY_DELETES, "2"),
1240        ]
1241        .into_iter()
1242        .map(|(k, v)| (k.to_string(), v.to_string()))
1243        .collect();
1244
1245        let previous_summary = Summary {
1246            operation: Operation::Overwrite,
1247            additional_properties: prev_props,
1248        };
1249
1250        let new_props: HashMap<String, String> = [
1251            (DELETED_DATA_FILES, "2"),
1252            (REMOVED_DELETE_FILES, "1"),
1253            (DELETED_RECORDS, "20"),
1254            (REMOVED_FILE_SIZE, "200"),
1255            (REMOVED_POSITION_DELETES, "1"),
1256            (REMOVED_EQUALITY_DELETES, "1"),
1257        ]
1258        .into_iter()
1259        .map(|(k, v)| (k.to_string(), v.to_string()))
1260        .collect();
1261
1262        let summary = Summary {
1263            operation: Operation::Delete,
1264            additional_properties: new_props,
1265        };
1266
1267        let updated = update_snapshot_summaries(summary, Some(&previous_summary), false).unwrap();
1268        let props = &updated.additional_properties;
1269
1270        assert_eq!(props.get(TOTAL_DATA_FILES).unwrap(), "8");
1271        assert_eq!(props.get(TOTAL_DELETE_FILES).unwrap(), "4");
1272        assert_eq!(props.get(TOTAL_RECORDS).unwrap(), "80");
1273        assert_eq!(props.get(TOTAL_FILE_SIZE).unwrap(), "800");
1274        assert_eq!(props.get(TOTAL_POSITION_DELETES).unwrap(), "2");
1275        assert_eq!(props.get(TOTAL_EQUALITY_DELETES).unwrap(), "1");
1276    }
1277}