1use std::collections::HashSet;
21
22use futures::{TryStreamExt, stream};
23
24use crate::Result;
25use crate::io::FileIO;
26use crate::table::Table;
27
28const DELETE_CONCURRENCY: usize = 10;
29
30pub async fn drop_table_data(table_info: &Table) -> Result<()> {
40 let mut manifest_lists_to_delete: HashSet<String> = HashSet::new();
41 let mut manifests_to_delete: HashSet<String> = HashSet::new();
42
43 let metadata = table_info.metadata_ref();
44 let io = table_info.file_io();
45 let results: Vec<_> =
47 futures::future::try_join_all(metadata.snapshots().map(|snapshot| async {
48 let manifest_list = table_info.manifest_list_reader(snapshot).load().await?;
49 Ok::<_, crate::Error>((snapshot.manifest_list().to_string(), manifest_list))
50 }))
51 .await?;
52
53 for (manifest_list_location, manifest_list) in results {
54 if !manifest_list_location.is_empty() {
55 manifest_lists_to_delete.insert(manifest_list_location);
56 }
57 for manifest_file in manifest_list.entries() {
58 manifests_to_delete.insert(manifest_file.manifest_path.clone());
59 }
60 }
61
62 if metadata.table_properties()?.gc_enabled {
64 delete_data_files(io, &manifests_to_delete).await?;
65 }
66
67 io.delete_stream(stream::iter(manifests_to_delete)).await?;
69
70 io.delete_stream(stream::iter(manifest_lists_to_delete))
72 .await?;
73
74 let prev_metadata_paths: Vec<String> = metadata
76 .metadata_log()
77 .iter()
78 .map(|m| m.metadata_file.clone())
79 .collect();
80 io.delete_stream(stream::iter(prev_metadata_paths)).await?;
81
82 let stats_paths: Vec<String> = metadata
84 .statistics_iter()
85 .map(|s| s.statistics_path.clone())
86 .collect();
87 io.delete_stream(stream::iter(stats_paths)).await?;
88
89 let partition_stats_paths: Vec<String> = metadata
91 .partition_statistics_iter()
92 .map(|s| s.statistics_path.clone())
93 .collect();
94 io.delete_stream(stream::iter(partition_stats_paths))
95 .await?;
96
97 if let Some(location) = table_info.metadata_location() {
99 io.delete(location).await?;
100 }
101
102 Ok(())
103}
104
105async fn delete_data_files(io: &FileIO, manifest_paths: &HashSet<String>) -> Result<()> {
107 stream::iter(manifest_paths.iter().map(Ok))
108 .try_for_each_concurrent(DELETE_CONCURRENCY, |manifest_path| async move {
109 let input = io.new_input(manifest_path)?;
110 let manifest_content = input.read().await?;
111 let manifest = crate::spec::Manifest::parse_avro(&manifest_content)?;
112
113 let data_file_paths = manifest
114 .entries()
115 .iter()
116 .map(|entry| entry.data_file.file_path().to_string())
117 .collect::<Vec<_>>();
118
119 io.delete_stream(stream::iter(data_file_paths)).await
120 })
121 .await
122}