iceberg/catalog/
utils.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Utility functions for catalog operations.
19
20use std::collections::HashSet;
21
22use futures::{TryStreamExt, stream};
23
24use crate::Result;
25use crate::io::FileIO;
26use crate::spec::TableMetadata;
27
28const DELETE_CONCURRENCY: usize = 10;
29
30/// Deletes all data and metadata files referenced by the given table metadata.
31///
32/// This mirrors the Java implementation's `CatalogUtil.dropTableData`.
33/// It collects all manifest files, manifest lists, previous metadata files,
34/// statistics files, and partition statistics files, then deletes them.
35///
36/// Data files within manifests are only deleted if the `gc.enabled` table
37/// property is `true` (the default), to avoid corrupting other tables that
38/// may share the same data files.
39pub async fn drop_table_data(
40    io: &FileIO,
41    metadata: &TableMetadata,
42    metadata_location: Option<&str>,
43) -> Result<()> {
44    let mut manifest_lists_to_delete: HashSet<String> = HashSet::new();
45    let mut manifests_to_delete: HashSet<String> = HashSet::new();
46
47    // Load all manifest lists concurrently
48    let results: Vec<_> =
49        futures::future::try_join_all(metadata.snapshots().map(|snapshot| async {
50            let manifest_list = snapshot.load_manifest_list(io, metadata).await?;
51            Ok::<_, crate::Error>((snapshot.manifest_list().to_string(), manifest_list))
52        }))
53        .await?;
54
55    for (manifest_list_location, manifest_list) in results {
56        if !manifest_list_location.is_empty() {
57            manifest_lists_to_delete.insert(manifest_list_location);
58        }
59        for manifest_file in manifest_list.entries() {
60            manifests_to_delete.insert(manifest_file.manifest_path.clone());
61        }
62    }
63
64    // Delete data files only if gc.enabled is true, to avoid corrupting shared tables
65    if metadata.table_properties()?.gc_enabled {
66        delete_data_files(io, &manifests_to_delete).await?;
67    }
68
69    // Delete manifest files
70    io.delete_stream(stream::iter(manifests_to_delete)).await?;
71
72    // Delete manifest lists
73    io.delete_stream(stream::iter(manifest_lists_to_delete))
74        .await?;
75
76    // Delete previous metadata files
77    let prev_metadata_paths: Vec<String> = metadata
78        .metadata_log()
79        .iter()
80        .map(|m| m.metadata_file.clone())
81        .collect();
82    io.delete_stream(stream::iter(prev_metadata_paths)).await?;
83
84    // Delete statistics files
85    let stats_paths: Vec<String> = metadata
86        .statistics_iter()
87        .map(|s| s.statistics_path.clone())
88        .collect();
89    io.delete_stream(stream::iter(stats_paths)).await?;
90
91    // Delete partition statistics files
92    let partition_stats_paths: Vec<String> = metadata
93        .partition_statistics_iter()
94        .map(|s| s.statistics_path.clone())
95        .collect();
96    io.delete_stream(stream::iter(partition_stats_paths))
97        .await?;
98
99    // Delete the current metadata file
100    if let Some(location) = metadata_location {
101        io.delete(location).await?;
102    }
103
104    Ok(())
105}
106
107/// Reads manifests concurrently and deletes the data files referenced within.
108async fn delete_data_files(io: &FileIO, manifest_paths: &HashSet<String>) -> Result<()> {
109    stream::iter(manifest_paths.iter().map(Ok))
110        .try_for_each_concurrent(DELETE_CONCURRENCY, |manifest_path| async move {
111            let input = io.new_input(manifest_path)?;
112            let manifest_content = input.read().await?;
113            let manifest = crate::spec::Manifest::parse_avro(&manifest_content)?;
114
115            let data_file_paths = manifest
116                .entries()
117                .iter()
118                .map(|entry| entry.data_file.file_path().to_string())
119                .collect::<Vec<_>>();
120
121            io.delete_stream(stream::iter(data_file_paths)).await
122        })
123        .await
124}