iceberg/catalog/
utils.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Utility functions for catalog operations.
19
20use std::collections::HashSet;
21
22use futures::{TryStreamExt, stream};
23
24use crate::Result;
25use crate::io::FileIO;
26use crate::table::Table;
27
28const DELETE_CONCURRENCY: usize = 10;
29
30/// Deletes all data and metadata files referenced by the given table metadata.
31///
32/// This mirrors the Java implementation's `CatalogUtil.dropTableData`.
33/// It collects all manifest files, manifest lists, previous metadata files,
34/// statistics files, and partition statistics files, then deletes them.
35///
36/// Data files within manifests are only deleted if the `gc.enabled` table
37/// property is `true` (the default), to avoid corrupting other tables that
38/// may share the same data files.
39pub async fn drop_table_data(table_info: &Table) -> Result<()> {
40    let mut manifest_lists_to_delete: HashSet<String> = HashSet::new();
41    let mut manifests_to_delete: HashSet<String> = HashSet::new();
42
43    let metadata = table_info.metadata_ref();
44    let io = table_info.file_io();
45    // Load all manifest lists concurrently
46    let results: Vec<_> =
47        futures::future::try_join_all(metadata.snapshots().map(|snapshot| async {
48            let manifest_list = table_info.manifest_list_reader(snapshot).load().await?;
49            Ok::<_, crate::Error>((snapshot.manifest_list().to_string(), manifest_list))
50        }))
51        .await?;
52
53    for (manifest_list_location, manifest_list) in results {
54        if !manifest_list_location.is_empty() {
55            manifest_lists_to_delete.insert(manifest_list_location);
56        }
57        for manifest_file in manifest_list.entries() {
58            manifests_to_delete.insert(manifest_file.manifest_path.clone());
59        }
60    }
61
62    // Delete data files only if gc.enabled is true, to avoid corrupting shared tables
63    if metadata.table_properties()?.gc_enabled {
64        delete_data_files(io, &manifests_to_delete).await?;
65    }
66
67    // Delete manifest files
68    io.delete_stream(stream::iter(manifests_to_delete)).await?;
69
70    // Delete manifest lists
71    io.delete_stream(stream::iter(manifest_lists_to_delete))
72        .await?;
73
74    // Delete previous metadata files
75    let prev_metadata_paths: Vec<String> = metadata
76        .metadata_log()
77        .iter()
78        .map(|m| m.metadata_file.clone())
79        .collect();
80    io.delete_stream(stream::iter(prev_metadata_paths)).await?;
81
82    // Delete statistics files
83    let stats_paths: Vec<String> = metadata
84        .statistics_iter()
85        .map(|s| s.statistics_path.clone())
86        .collect();
87    io.delete_stream(stream::iter(stats_paths)).await?;
88
89    // Delete partition statistics files
90    let partition_stats_paths: Vec<String> = metadata
91        .partition_statistics_iter()
92        .map(|s| s.statistics_path.clone())
93        .collect();
94    io.delete_stream(stream::iter(partition_stats_paths))
95        .await?;
96
97    // Delete the current metadata file
98    if let Some(location) = table_info.metadata_location() {
99        io.delete(location).await?;
100    }
101
102    Ok(())
103}
104
105/// Reads manifests concurrently and deletes the data files referenced within.
106async fn delete_data_files(io: &FileIO, manifest_paths: &HashSet<String>) -> Result<()> {
107    stream::iter(manifest_paths.iter().map(Ok))
108        .try_for_each_concurrent(DELETE_CONCURRENCY, |manifest_path| async move {
109            let input = io.new_input(manifest_path)?;
110            let manifest_content = input.read().await?;
111            let manifest = crate::spec::Manifest::parse_avro(&manifest_content)?;
112
113            let data_file_paths = manifest
114                .entries()
115                .iter()
116                .map(|entry| entry.data_file.file_path().to_string())
117                .collect::<Vec<_>>();
118
119            io.delete_stream(stream::iter(data_file_paths)).await
120        })
121        .await
122}