iceberg/spec/
snapshot_summary.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::collections::HashMap;
19
20use itertools::Itertools;
21
22use super::{DataContentType, DataFile, PartitionSpecRef};
23use crate::spec::{ManifestContentType, ManifestFile, Operation, SchemaRef, Summary};
24use crate::{Error, ErrorKind, Result};
25
26const ADDED_DATA_FILES: &str = "added-data-files";
27const ADDED_DELETE_FILES: &str = "added-delete-files";
28const ADDED_EQUALITY_DELETES: &str = "added-equality-deletes";
29const ADDED_FILE_SIZE: &str = "added-files-size";
30const ADDED_POSITION_DELETES: &str = "added-position-deletes";
31const ADDED_POSITION_DELETE_FILES: &str = "added-position-delete-files";
32const ADDED_RECORDS: &str = "added-records";
33const DELETED_DATA_FILES: &str = "deleted-data-files";
34const DELETED_RECORDS: &str = "deleted-records";
35const ADDED_EQUALITY_DELETE_FILES: &str = "added-equality-delete-files";
36const REMOVED_DELETE_FILES: &str = "removed-delete-files";
37const REMOVED_EQUALITY_DELETES: &str = "removed-equality-deletes";
38const REMOVED_EQUALITY_DELETE_FILES: &str = "removed-equality-delete-files";
39const REMOVED_FILE_SIZE: &str = "removed-files-size";
40const REMOVED_POSITION_DELETES: &str = "removed-position-deletes";
41const REMOVED_POSITION_DELETE_FILES: &str = "removed-position-delete-files";
42const TOTAL_EQUALITY_DELETES: &str = "total-equality-deletes";
43const TOTAL_POSITION_DELETES: &str = "total-position-deletes";
44const TOTAL_DATA_FILES: &str = "total-data-files";
45const TOTAL_DELETE_FILES: &str = "total-delete-files";
46const TOTAL_RECORDS: &str = "total-records";
47const TOTAL_FILE_SIZE: &str = "total-files-size";
48const CHANGED_PARTITION_COUNT_PROP: &str = "changed-partition-count";
49const CHANGED_PARTITION_PREFIX: &str = "partitions.";
50
51/// `SnapshotSummaryCollector` collects and aggregates snapshot update metrics.
52/// It gathers metrics about added or removed data files and manifests, and tracks
53/// partition-specific updates.
54#[derive(Default)]
55pub struct SnapshotSummaryCollector {
56    metrics: UpdateMetrics,
57    partition_metrics: HashMap<String, UpdateMetrics>,
58    max_changed_partitions_for_summaries: u64,
59    properties: HashMap<String, String>,
60    trust_partition_metrics: bool,
61}
62
63impl SnapshotSummaryCollector {
64    /// Set properties for snapshot summary
65    pub fn set(&mut self, key: &str, value: &str) {
66        self.properties.insert(key.to_string(), value.to_string());
67    }
68
69    /// Sets the limit for including partition summaries. Summaries are not
70    /// included if the number of partitions is exceeded.
71    pub fn set_partition_summary_limit(&mut self, limit: u64) {
72        self.max_changed_partitions_for_summaries = limit;
73    }
74
75    /// Adds a data file to the summary collector
76    pub fn add_file(
77        &mut self,
78        data_file: &DataFile,
79        schema: SchemaRef,
80        partition_spec: PartitionSpecRef,
81    ) {
82        self.metrics.add_file(data_file);
83        if !data_file.partition.fields().is_empty() {
84            self.update_partition_metrics(schema, partition_spec, data_file, true);
85        }
86    }
87
88    /// Removes a data file from the summary collector
89    pub fn remove_file(
90        &mut self,
91        data_file: &DataFile,
92        schema: SchemaRef,
93        partition_spec: PartitionSpecRef,
94    ) {
95        self.metrics.remove_file(data_file);
96        if !data_file.partition.fields().is_empty() {
97            self.update_partition_metrics(schema, partition_spec, data_file, false);
98        }
99    }
100
101    /// Adds a manifest to the summary collector
102    pub fn add_manifest(&mut self, manifest: &ManifestFile) {
103        self.trust_partition_metrics = false;
104        self.partition_metrics.clear();
105        self.metrics.add_manifest(manifest);
106    }
107
108    /// Updates partition-specific metrics for a data file.
109    pub fn update_partition_metrics(
110        &mut self,
111        schema: SchemaRef,
112        partition_spec: PartitionSpecRef,
113        data_file: &DataFile,
114        is_add_file: bool,
115    ) {
116        let partition_path = partition_spec.partition_to_path(&data_file.partition, schema);
117        let metrics = self.partition_metrics.entry(partition_path).or_default();
118
119        if is_add_file {
120            metrics.add_file(data_file);
121        } else {
122            metrics.remove_file(data_file);
123        }
124    }
125
126    /// Merges another `SnapshotSummaryCollector` into the current one
127    pub fn merge(&mut self, summary: SnapshotSummaryCollector) {
128        self.metrics.merge(&summary.metrics);
129        self.properties.extend(summary.properties);
130
131        if self.trust_partition_metrics && summary.trust_partition_metrics {
132            for (partition, partition_metric) in summary.partition_metrics.iter() {
133                self.partition_metrics
134                    .entry(partition.to_string())
135                    .or_default()
136                    .merge(partition_metric);
137            }
138        } else {
139            self.partition_metrics.clear();
140            self.trust_partition_metrics = false;
141        }
142    }
143
144    /// Builds final map of summaries
145    pub fn build(&self) -> HashMap<String, String> {
146        let mut properties = self.metrics.to_map();
147        let changed_partitions_count = self.partition_metrics.len() as u64;
148        set_if_positive(
149            &mut properties,
150            changed_partitions_count,
151            CHANGED_PARTITION_COUNT_PROP,
152        );
153
154        if changed_partitions_count <= self.max_changed_partitions_for_summaries {
155            for (partition_path, update_metrics_partition) in &self.partition_metrics {
156                let property_key = format!("{CHANGED_PARTITION_PREFIX}{partition_path}");
157                let partition_summary = update_metrics_partition
158                    .to_map()
159                    .into_iter()
160                    .map(|(property, value)| format!("{property}={value}"))
161                    .join(",");
162
163                if !partition_summary.is_empty() {
164                    properties.insert(property_key, partition_summary);
165                }
166            }
167        }
168        properties
169    }
170}
171
172#[derive(Debug, Default)]
173struct UpdateMetrics {
174    added_file_size: u64,
175    removed_file_size: u64,
176    added_data_files: u32,
177    removed_data_files: u32,
178    added_eq_delete_files: u64,
179    removed_eq_delete_files: u64,
180    added_pos_delete_files: u64,
181    removed_pos_delete_files: u64,
182    added_delete_files: u32,
183    removed_delete_files: u32,
184    added_records: u64,
185    deleted_records: u64,
186    added_pos_deletes: u64,
187    removed_pos_deletes: u64,
188    added_eq_deletes: u64,
189    removed_eq_deletes: u64,
190}
191
192impl UpdateMetrics {
193    fn add_file(&mut self, data_file: &DataFile) {
194        self.added_file_size += data_file.file_size_in_bytes;
195        match data_file.content_type() {
196            DataContentType::Data => {
197                self.added_data_files += 1;
198                self.added_records += data_file.record_count;
199            }
200            DataContentType::PositionDeletes => {
201                self.added_delete_files += 1;
202                self.added_pos_delete_files += 1;
203                self.added_pos_deletes += data_file.record_count;
204            }
205            DataContentType::EqualityDeletes => {
206                self.added_delete_files += 1;
207                self.added_eq_delete_files += 1;
208                self.added_eq_deletes += data_file.record_count;
209            }
210        }
211    }
212
213    fn remove_file(&mut self, data_file: &DataFile) {
214        self.removed_file_size += data_file.file_size_in_bytes;
215        match data_file.content_type() {
216            DataContentType::Data => {
217                self.removed_data_files += 1;
218                self.deleted_records += data_file.record_count;
219            }
220            DataContentType::PositionDeletes => {
221                self.removed_delete_files += 1;
222                self.removed_pos_delete_files += 1;
223                self.removed_pos_deletes += data_file.record_count;
224            }
225            DataContentType::EqualityDeletes => {
226                self.removed_delete_files += 1;
227                self.removed_eq_delete_files += 1;
228                self.removed_eq_deletes += data_file.record_count;
229            }
230        }
231    }
232
233    fn add_manifest(&mut self, manifest: &ManifestFile) {
234        match manifest.content {
235            ManifestContentType::Data => {
236                self.added_data_files += manifest.added_files_count.unwrap_or(0);
237                self.added_records += manifest.added_rows_count.unwrap_or(0);
238                self.removed_data_files += manifest.deleted_files_count.unwrap_or(0);
239                self.deleted_records += manifest.deleted_rows_count.unwrap_or(0);
240            }
241            ManifestContentType::Deletes => {
242                self.added_delete_files += manifest.added_files_count.unwrap_or(0);
243                self.removed_delete_files += manifest.deleted_files_count.unwrap_or(0);
244            }
245        }
246    }
247
248    fn to_map(&self) -> HashMap<String, String> {
249        let mut properties = HashMap::new();
250        set_if_positive(&mut properties, self.added_file_size, ADDED_FILE_SIZE);
251        set_if_positive(&mut properties, self.removed_file_size, REMOVED_FILE_SIZE);
252        set_if_positive(&mut properties, self.added_data_files, ADDED_DATA_FILES);
253        set_if_positive(&mut properties, self.removed_data_files, DELETED_DATA_FILES);
254        set_if_positive(
255            &mut properties,
256            self.added_eq_delete_files,
257            ADDED_EQUALITY_DELETE_FILES,
258        );
259        set_if_positive(
260            &mut properties,
261            self.removed_eq_delete_files,
262            REMOVED_EQUALITY_DELETE_FILES,
263        );
264        set_if_positive(
265            &mut properties,
266            self.added_pos_delete_files,
267            ADDED_POSITION_DELETE_FILES,
268        );
269        set_if_positive(
270            &mut properties,
271            self.removed_pos_delete_files,
272            REMOVED_POSITION_DELETE_FILES,
273        );
274        set_if_positive(&mut properties, self.added_delete_files, ADDED_DELETE_FILES);
275        set_if_positive(
276            &mut properties,
277            self.removed_delete_files,
278            REMOVED_DELETE_FILES,
279        );
280        set_if_positive(&mut properties, self.added_records, ADDED_RECORDS);
281        set_if_positive(&mut properties, self.deleted_records, DELETED_RECORDS);
282        set_if_positive(
283            &mut properties,
284            self.added_pos_deletes,
285            ADDED_POSITION_DELETES,
286        );
287        set_if_positive(
288            &mut properties,
289            self.removed_pos_deletes,
290            REMOVED_POSITION_DELETES,
291        );
292        set_if_positive(
293            &mut properties,
294            self.added_eq_deletes,
295            ADDED_EQUALITY_DELETES,
296        );
297        set_if_positive(
298            &mut properties,
299            self.removed_eq_deletes,
300            REMOVED_EQUALITY_DELETES,
301        );
302        properties
303    }
304
305    fn merge(&mut self, other: &UpdateMetrics) {
306        self.added_file_size += other.added_file_size;
307        self.removed_file_size += other.removed_file_size;
308        self.added_data_files += other.added_data_files;
309        self.removed_data_files += other.removed_data_files;
310        self.added_eq_delete_files += other.added_eq_delete_files;
311        self.removed_eq_delete_files += other.removed_eq_delete_files;
312        self.added_pos_delete_files += other.added_pos_delete_files;
313        self.removed_pos_delete_files += other.removed_pos_delete_files;
314        self.added_delete_files += other.added_delete_files;
315        self.removed_delete_files += other.removed_delete_files;
316        self.added_records += other.added_records;
317        self.deleted_records += other.deleted_records;
318        self.added_pos_deletes += other.added_pos_deletes;
319        self.removed_pos_deletes += other.removed_pos_deletes;
320        self.added_eq_deletes += other.added_eq_deletes;
321        self.removed_eq_deletes += other.removed_eq_deletes;
322    }
323}
324
325fn set_if_positive<T>(properties: &mut HashMap<String, String>, value: T, property_name: &str)
326where T: PartialOrd + Default + ToString {
327    if value > T::default() {
328        properties.insert(property_name.to_string(), value.to_string());
329    }
330}
331
332#[allow(dead_code)]
333pub(crate) fn update_snapshot_summaries(
334    summary: Summary,
335    previous_summary: Option<&Summary>,
336    truncate_full_table: bool,
337) -> Result<Summary> {
338    // Validate that the operation is supported
339    if summary.operation != Operation::Append
340        && summary.operation != Operation::Overwrite
341        && summary.operation != Operation::Delete
342    {
343        return Err(Error::new(
344            ErrorKind::DataInvalid,
345            "Operation is not supported.",
346        ));
347    }
348
349    let mut summary = match previous_summary {
350        Some(prev_summary) if truncate_full_table && summary.operation == Operation::Overwrite => {
351            truncate_table_summary(summary, prev_summary)
352                .map_err(|err| {
353                    Error::new(ErrorKind::Unexpected, "Failed to truncate table summary.")
354                        .with_source(err)
355                })
356                .unwrap()
357        }
358        _ => summary,
359    };
360
361    update_totals(
362        &mut summary,
363        previous_summary,
364        TOTAL_DATA_FILES,
365        ADDED_DATA_FILES,
366        DELETED_DATA_FILES,
367    );
368
369    update_totals(
370        &mut summary,
371        previous_summary,
372        TOTAL_DELETE_FILES,
373        ADDED_DELETE_FILES,
374        REMOVED_DELETE_FILES,
375    );
376
377    update_totals(
378        &mut summary,
379        previous_summary,
380        TOTAL_RECORDS,
381        ADDED_RECORDS,
382        DELETED_RECORDS,
383    );
384
385    update_totals(
386        &mut summary,
387        previous_summary,
388        TOTAL_FILE_SIZE,
389        ADDED_FILE_SIZE,
390        REMOVED_FILE_SIZE,
391    );
392
393    update_totals(
394        &mut summary,
395        previous_summary,
396        TOTAL_POSITION_DELETES,
397        ADDED_POSITION_DELETES,
398        REMOVED_POSITION_DELETES,
399    );
400
401    update_totals(
402        &mut summary,
403        previous_summary,
404        TOTAL_EQUALITY_DELETES,
405        ADDED_EQUALITY_DELETES,
406        REMOVED_EQUALITY_DELETES,
407    );
408    Ok(summary)
409}
410
411#[allow(dead_code)]
412fn get_prop(previous_summary: &Summary, prop: &str) -> Result<i32> {
413    let value_str = previous_summary
414        .additional_properties
415        .get(prop)
416        .map(String::as_str)
417        .unwrap_or("0");
418    value_str.parse::<i32>().map_err(|err| {
419        Error::new(
420            ErrorKind::Unexpected,
421            "Failed to parse value from previous summary property.",
422        )
423        .with_source(err)
424    })
425}
426
427#[allow(dead_code)]
428fn truncate_table_summary(mut summary: Summary, previous_summary: &Summary) -> Result<Summary> {
429    for prop in [
430        TOTAL_DATA_FILES,
431        TOTAL_DELETE_FILES,
432        TOTAL_RECORDS,
433        TOTAL_FILE_SIZE,
434        TOTAL_POSITION_DELETES,
435        TOTAL_EQUALITY_DELETES,
436    ] {
437        summary
438            .additional_properties
439            .insert(prop.to_string(), "0".to_string());
440    }
441
442    let value = get_prop(previous_summary, TOTAL_DATA_FILES)?;
443    if value != 0 {
444        summary
445            .additional_properties
446            .insert(DELETED_DATA_FILES.to_string(), value.to_string());
447    }
448    let value = get_prop(previous_summary, TOTAL_DELETE_FILES)?;
449    if value != 0 {
450        summary
451            .additional_properties
452            .insert(REMOVED_DELETE_FILES.to_string(), value.to_string());
453    }
454    let value = get_prop(previous_summary, TOTAL_RECORDS)?;
455    if value != 0 {
456        summary
457            .additional_properties
458            .insert(DELETED_RECORDS.to_string(), value.to_string());
459    }
460    let value = get_prop(previous_summary, TOTAL_FILE_SIZE)?;
461    if value != 0 {
462        summary
463            .additional_properties
464            .insert(REMOVED_FILE_SIZE.to_string(), value.to_string());
465    }
466
467    let value = get_prop(previous_summary, TOTAL_POSITION_DELETES)?;
468    if value != 0 {
469        summary
470            .additional_properties
471            .insert(REMOVED_POSITION_DELETES.to_string(), value.to_string());
472    }
473
474    let value = get_prop(previous_summary, TOTAL_EQUALITY_DELETES)?;
475    if value != 0 {
476        summary
477            .additional_properties
478            .insert(REMOVED_EQUALITY_DELETES.to_string(), value.to_string());
479    }
480
481    Ok(summary)
482}
483
484#[allow(dead_code)]
485fn update_totals(
486    summary: &mut Summary,
487    previous_summary: Option<&Summary>,
488    total_property: &str,
489    added_property: &str,
490    removed_property: &str,
491) {
492    let previous_total = previous_summary.map_or(0, |previous_summary| {
493        previous_summary
494            .additional_properties
495            .get(total_property)
496            .map_or(0, |value| value.parse::<u64>().unwrap())
497    });
498
499    let mut new_total = previous_total;
500    if let Some(value) = summary
501        .additional_properties
502        .get(added_property)
503        .map(|value| value.parse::<u64>().unwrap())
504    {
505        new_total += value;
506    }
507    if let Some(value) = summary
508        .additional_properties
509        .get(removed_property)
510        .map(|value| value.parse::<u64>().unwrap())
511    {
512        new_total -= value;
513    }
514    summary
515        .additional_properties
516        .insert(total_property.to_string(), new_total.to_string());
517}
518
519#[cfg(test)]
520mod tests {
521    use std::collections::HashMap;
522    use std::sync::Arc;
523
524    use super::*;
525    use crate::spec::{
526        DataFileFormat, Datum, Literal, NestedField, PartitionSpec, PrimitiveType, Schema, Struct,
527        Transform, Type, UnboundPartitionField,
528    };
529
530    #[test]
531    fn test_update_snapshot_summaries_append() {
532        let prev_props: HashMap<String, String> = [
533            (TOTAL_DATA_FILES.to_string(), "10".to_string()),
534            (TOTAL_DELETE_FILES.to_string(), "5".to_string()),
535            (TOTAL_RECORDS.to_string(), "100".to_string()),
536            (TOTAL_FILE_SIZE.to_string(), "1000".to_string()),
537            (TOTAL_POSITION_DELETES.to_string(), "3".to_string()),
538            (TOTAL_EQUALITY_DELETES.to_string(), "2".to_string()),
539        ]
540        .into_iter()
541        .collect();
542
543        let previous_summary = Summary {
544            operation: Operation::Append,
545            additional_properties: prev_props,
546        };
547
548        let new_props: HashMap<String, String> = [
549            (ADDED_DATA_FILES.to_string(), "4".to_string()),
550            (DELETED_DATA_FILES.to_string(), "1".to_string()),
551            (ADDED_DELETE_FILES.to_string(), "2".to_string()),
552            (REMOVED_DELETE_FILES.to_string(), "1".to_string()),
553            (ADDED_RECORDS.to_string(), "40".to_string()),
554            (DELETED_RECORDS.to_string(), "10".to_string()),
555            (ADDED_FILE_SIZE.to_string(), "400".to_string()),
556            (REMOVED_FILE_SIZE.to_string(), "100".to_string()),
557            (ADDED_POSITION_DELETES.to_string(), "5".to_string()),
558            (REMOVED_POSITION_DELETES.to_string(), "2".to_string()),
559            (ADDED_EQUALITY_DELETES.to_string(), "3".to_string()),
560            (REMOVED_EQUALITY_DELETES.to_string(), "1".to_string()),
561        ]
562        .into_iter()
563        .collect();
564
565        let summary = Summary {
566            operation: Operation::Append,
567            additional_properties: new_props,
568        };
569
570        let updated = update_snapshot_summaries(summary, Some(&previous_summary), false).unwrap();
571
572        assert_eq!(
573            updated.additional_properties.get(TOTAL_DATA_FILES).unwrap(),
574            "13"
575        );
576        assert_eq!(
577            updated
578                .additional_properties
579                .get(TOTAL_DELETE_FILES)
580                .unwrap(),
581            "6"
582        );
583        assert_eq!(
584            updated.additional_properties.get(TOTAL_RECORDS).unwrap(),
585            "130"
586        );
587        assert_eq!(
588            updated.additional_properties.get(TOTAL_FILE_SIZE).unwrap(),
589            "1300"
590        );
591        assert_eq!(
592            updated
593                .additional_properties
594                .get(TOTAL_POSITION_DELETES)
595                .unwrap(),
596            "6"
597        );
598        assert_eq!(
599            updated
600                .additional_properties
601                .get(TOTAL_EQUALITY_DELETES)
602                .unwrap(),
603            "4"
604        );
605    }
606
607    #[test]
608    fn test_truncate_table_summary() {
609        let prev_props: HashMap<String, String> = [
610            (TOTAL_DATA_FILES.to_string(), "10".to_string()),
611            (TOTAL_DELETE_FILES.to_string(), "5".to_string()),
612            (TOTAL_RECORDS.to_string(), "100".to_string()),
613            (TOTAL_FILE_SIZE.to_string(), "1000".to_string()),
614            (TOTAL_POSITION_DELETES.to_string(), "3".to_string()),
615            (TOTAL_EQUALITY_DELETES.to_string(), "2".to_string()),
616        ]
617        .into_iter()
618        .collect();
619
620        let previous_summary = Summary {
621            operation: Operation::Overwrite,
622            additional_properties: prev_props,
623        };
624
625        let mut new_props = HashMap::new();
626        new_props.insert("dummy".to_string(), "value".to_string());
627        let summary = Summary {
628            operation: Operation::Overwrite,
629            additional_properties: new_props,
630        };
631
632        let truncated = truncate_table_summary(summary, &previous_summary).unwrap();
633
634        assert_eq!(
635            truncated
636                .additional_properties
637                .get(TOTAL_DATA_FILES)
638                .unwrap(),
639            "0"
640        );
641        assert_eq!(
642            truncated
643                .additional_properties
644                .get(TOTAL_DELETE_FILES)
645                .unwrap(),
646            "0"
647        );
648        assert_eq!(
649            truncated.additional_properties.get(TOTAL_RECORDS).unwrap(),
650            "0"
651        );
652        assert_eq!(
653            truncated
654                .additional_properties
655                .get(TOTAL_FILE_SIZE)
656                .unwrap(),
657            "0"
658        );
659        assert_eq!(
660            truncated
661                .additional_properties
662                .get(TOTAL_POSITION_DELETES)
663                .unwrap(),
664            "0"
665        );
666        assert_eq!(
667            truncated
668                .additional_properties
669                .get(TOTAL_EQUALITY_DELETES)
670                .unwrap(),
671            "0"
672        );
673
674        assert_eq!(
675            truncated
676                .additional_properties
677                .get(DELETED_DATA_FILES)
678                .unwrap(),
679            "10"
680        );
681        assert_eq!(
682            truncated
683                .additional_properties
684                .get(REMOVED_DELETE_FILES)
685                .unwrap(),
686            "5"
687        );
688        assert_eq!(
689            truncated
690                .additional_properties
691                .get(DELETED_RECORDS)
692                .unwrap(),
693            "100"
694        );
695        assert_eq!(
696            truncated
697                .additional_properties
698                .get(REMOVED_FILE_SIZE)
699                .unwrap(),
700            "1000"
701        );
702        assert_eq!(
703            truncated
704                .additional_properties
705                .get(REMOVED_POSITION_DELETES)
706                .unwrap(),
707            "3"
708        );
709        assert_eq!(
710            truncated
711                .additional_properties
712                .get(REMOVED_EQUALITY_DELETES)
713                .unwrap(),
714            "2"
715        );
716    }
717
718    #[test]
719    fn test_snapshot_summary_collector_build() {
720        let schema = Arc::new(
721            Schema::builder()
722                .with_fields(vec![
723                    NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
724                    NestedField::required(2, "name", Type::Primitive(PrimitiveType::String)).into(),
725                ])
726                .build()
727                .unwrap(),
728        );
729
730        let partition_spec = Arc::new(
731            PartitionSpec::builder(schema.clone())
732                .add_unbound_fields(vec![
733                    UnboundPartitionField::builder()
734                        .source_id(2)
735                        .name("year".to_string())
736                        .transform(Transform::Identity)
737                        .build(),
738                ])
739                .unwrap()
740                .with_spec_id(1)
741                .build()
742                .unwrap(),
743        );
744
745        let mut collector = SnapshotSummaryCollector::default();
746        collector.set_partition_summary_limit(10);
747
748        let file1 = DataFile {
749            content: DataContentType::Data,
750            file_path: "s3://testbucket/path/to/file1.parquet".to_string(),
751            file_format: DataFileFormat::Parquet,
752            partition: Struct::from_iter(vec![]),
753            record_count: 10,
754            file_size_in_bytes: 100,
755            column_sizes: HashMap::from([(1, 46), (2, 48), (3, 48)]),
756            value_counts: HashMap::from([(1, 10), (2, 10), (3, 10)]),
757            null_value_counts: HashMap::from([(1, 0), (2, 0), (3, 0)]),
758            nan_value_counts: HashMap::new(),
759            lower_bounds: HashMap::from([
760                (1, Datum::long(1)),
761                (2, Datum::string("a")),
762                (3, Datum::string("x")),
763            ]),
764            upper_bounds: HashMap::from([
765                (1, Datum::long(1)),
766                (2, Datum::string("a")),
767                (3, Datum::string("x")),
768            ]),
769            key_metadata: None,
770            split_offsets: vec![4],
771            equality_ids: None,
772            sort_order_id: Some(0),
773            partition_spec_id: 0,
774            first_row_id: None,
775            referenced_data_file: None,
776            content_offset: None,
777            content_size_in_bytes: None,
778        };
779
780        let file2 = DataFile {
781            content: DataContentType::Data,
782            file_path: "s3://testbucket/path/to/file2.parquet".to_string(),
783            file_format: DataFileFormat::Parquet,
784            partition: Struct::from_iter(vec![Some(Literal::string("2025"))]),
785            record_count: 20,
786            file_size_in_bytes: 200,
787            column_sizes: HashMap::from([(1, 46), (2, 48), (3, 48)]),
788            value_counts: HashMap::from([(1, 20), (2, 20), (3, 20)]),
789            null_value_counts: HashMap::from([(1, 0), (2, 0), (3, 0)]),
790            nan_value_counts: HashMap::new(),
791            lower_bounds: HashMap::from([
792                (1, Datum::long(1)),
793                (2, Datum::string("a")),
794                (3, Datum::string("x")),
795            ]),
796            upper_bounds: HashMap::from([
797                (1, Datum::long(1)),
798                (2, Datum::string("a")),
799                (3, Datum::string("x")),
800            ]),
801            key_metadata: None,
802            split_offsets: vec![4],
803            equality_ids: None,
804            sort_order_id: Some(0),
805            partition_spec_id: 0,
806            first_row_id: None,
807            referenced_data_file: None,
808            content_offset: None,
809            content_size_in_bytes: None,
810        };
811
812        collector.add_file(&file1, schema.clone(), partition_spec.clone());
813        collector.add_file(&file2, schema.clone(), partition_spec.clone());
814
815        collector.remove_file(&file1, schema.clone(), partition_spec.clone());
816
817        let props = collector.build();
818
819        assert_eq!(props.get(ADDED_FILE_SIZE).unwrap(), "300");
820        assert_eq!(props.get(REMOVED_FILE_SIZE).unwrap(), "100");
821
822        let partition_key = format!("{}{}", CHANGED_PARTITION_PREFIX, "year=2025");
823
824        assert!(props.contains_key(&partition_key));
825
826        let partition_summary = props.get(&partition_key).unwrap();
827        assert!(partition_summary.contains(&format!("{ADDED_FILE_SIZE}=200")));
828        assert!(partition_summary.contains(&format!("{ADDED_DATA_FILES}=1")));
829        assert!(partition_summary.contains(&format!("{ADDED_RECORDS}=20")));
830    }
831
832    #[test]
833    fn test_snapshot_summary_collector_add_manifest() {
834        let mut collector = SnapshotSummaryCollector::default();
835        collector.set_partition_summary_limit(10);
836
837        let manifest = ManifestFile {
838            manifest_path: "file://dummy.manifest".to_string(),
839            manifest_length: 0,
840            partition_spec_id: 0,
841            content: ManifestContentType::Data,
842            sequence_number: 0,
843            min_sequence_number: 0,
844            added_snapshot_id: 0,
845            added_files_count: Some(3),
846            existing_files_count: Some(0),
847            deleted_files_count: Some(1),
848            added_rows_count: Some(100),
849            existing_rows_count: Some(0),
850            deleted_rows_count: Some(50),
851            partitions: Some(Vec::new()),
852            key_metadata: None,
853            first_row_id: None,
854        };
855
856        collector
857            .partition_metrics
858            .insert("dummy".to_string(), UpdateMetrics::default());
859        collector.add_manifest(&manifest);
860
861        let props = collector.build();
862        assert_eq!(props.get(ADDED_DATA_FILES).unwrap(), "3");
863        assert_eq!(props.get(DELETED_DATA_FILES).unwrap(), "1");
864        assert_eq!(props.get(ADDED_RECORDS).unwrap(), "100");
865        assert_eq!(props.get(DELETED_RECORDS).unwrap(), "50");
866    }
867
868    #[test]
869    fn test_snapshot_summary_collector_merge() {
870        let schema = Arc::new(
871            Schema::builder()
872                .with_fields(vec![
873                    NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
874                    NestedField::required(2, "name", Type::Primitive(PrimitiveType::String)).into(),
875                ])
876                .build()
877                .unwrap(),
878        );
879
880        let partition_spec = Arc::new(
881            PartitionSpec::builder(schema.clone())
882                .add_unbound_fields(vec![
883                    UnboundPartitionField::builder()
884                        .source_id(2)
885                        .name("year".to_string())
886                        .transform(Transform::Identity)
887                        .build(),
888                ])
889                .unwrap()
890                .with_spec_id(1)
891                .build()
892                .unwrap(),
893        );
894
895        let mut summary_one = SnapshotSummaryCollector::default();
896        let mut summary_two = SnapshotSummaryCollector::default();
897
898        summary_one.add_file(
899            &DataFile {
900                content: DataContentType::Data,
901                file_path: "test.parquet".into(),
902                file_format: DataFileFormat::Parquet,
903                partition: Struct::from_iter(vec![]),
904                record_count: 10,
905                file_size_in_bytes: 100,
906                column_sizes: HashMap::new(),
907                value_counts: HashMap::new(),
908                null_value_counts: HashMap::new(),
909                nan_value_counts: HashMap::new(),
910                lower_bounds: HashMap::new(),
911                upper_bounds: HashMap::new(),
912                key_metadata: None,
913                split_offsets: vec![],
914                equality_ids: None,
915                sort_order_id: None,
916                partition_spec_id: 0,
917                first_row_id: None,
918                referenced_data_file: None,
919                content_offset: None,
920                content_size_in_bytes: None,
921            },
922            schema.clone(),
923            partition_spec.clone(),
924        );
925
926        summary_two.add_file(
927            &DataFile {
928                content: DataContentType::Data,
929                file_path: "test.parquet".into(),
930                file_format: DataFileFormat::Parquet,
931                partition: Struct::from_iter(vec![]),
932                record_count: 20,
933                file_size_in_bytes: 200,
934                column_sizes: HashMap::new(),
935                value_counts: HashMap::new(),
936                null_value_counts: HashMap::new(),
937                nan_value_counts: HashMap::new(),
938                lower_bounds: HashMap::new(),
939                upper_bounds: HashMap::new(),
940                key_metadata: None,
941                split_offsets: vec![],
942                equality_ids: None,
943                sort_order_id: None,
944                partition_spec_id: 0,
945                first_row_id: None,
946                referenced_data_file: None,
947                content_offset: None,
948                content_size_in_bytes: None,
949            },
950            schema.clone(),
951            partition_spec.clone(),
952        );
953
954        summary_one.merge(summary_two);
955        let props = summary_one.build();
956        assert_eq!(props.get(ADDED_DATA_FILES).unwrap(), "2");
957        assert_eq!(props.get(ADDED_RECORDS).unwrap(), "30");
958
959        let mut summary_three = SnapshotSummaryCollector::default();
960        let mut summary_four = SnapshotSummaryCollector::default();
961
962        summary_three.add_manifest(&ManifestFile {
963            manifest_path: "test.manifest".to_string(),
964            manifest_length: 0,
965            partition_spec_id: 0,
966            content: ManifestContentType::Data,
967            sequence_number: 0,
968            min_sequence_number: 0,
969            added_snapshot_id: 0,
970            added_files_count: Some(1),
971            existing_files_count: Some(0),
972            deleted_files_count: Some(0),
973            added_rows_count: Some(5),
974            existing_rows_count: Some(0),
975            deleted_rows_count: Some(0),
976            partitions: Some(Vec::new()),
977            key_metadata: None,
978            first_row_id: None,
979        });
980
981        summary_four.add_file(
982            &DataFile {
983                content: DataContentType::Data,
984                file_path: "test.parquet".into(),
985                file_format: DataFileFormat::Parquet,
986                partition: Struct::from_iter(vec![]),
987                record_count: 1,
988                file_size_in_bytes: 10,
989                column_sizes: HashMap::new(),
990                value_counts: HashMap::new(),
991                null_value_counts: HashMap::new(),
992                nan_value_counts: HashMap::new(),
993                lower_bounds: HashMap::new(),
994                upper_bounds: HashMap::new(),
995                key_metadata: None,
996                split_offsets: vec![],
997                equality_ids: None,
998                sort_order_id: None,
999                partition_spec_id: 0,
1000                first_row_id: None,
1001                referenced_data_file: None,
1002                content_offset: None,
1003                content_size_in_bytes: None,
1004            },
1005            schema.clone(),
1006            partition_spec.clone(),
1007        );
1008
1009        summary_three.merge(summary_four);
1010        let props = summary_three.build();
1011
1012        assert_eq!(props.get(ADDED_DATA_FILES).unwrap(), "2");
1013        assert_eq!(props.get(ADDED_RECORDS).unwrap(), "6");
1014        assert!(
1015            props
1016                .iter()
1017                .all(|(k, _)| !k.starts_with(CHANGED_PARTITION_PREFIX))
1018        );
1019    }
1020}