1use std::collections::HashSet;
21
22use futures::{TryStreamExt, stream};
23
24use crate::Result;
25use crate::io::FileIO;
26use crate::spec::TableMetadata;
27
28const DELETE_CONCURRENCY: usize = 10;
29
30pub async fn drop_table_data(
40 io: &FileIO,
41 metadata: &TableMetadata,
42 metadata_location: Option<&str>,
43) -> Result<()> {
44 let mut manifest_lists_to_delete: HashSet<String> = HashSet::new();
45 let mut manifests_to_delete: HashSet<String> = HashSet::new();
46
47 let results: Vec<_> =
49 futures::future::try_join_all(metadata.snapshots().map(|snapshot| async {
50 let manifest_list = snapshot.load_manifest_list(io, metadata).await?;
51 Ok::<_, crate::Error>((snapshot.manifest_list().to_string(), manifest_list))
52 }))
53 .await?;
54
55 for (manifest_list_location, manifest_list) in results {
56 if !manifest_list_location.is_empty() {
57 manifest_lists_to_delete.insert(manifest_list_location);
58 }
59 for manifest_file in manifest_list.entries() {
60 manifests_to_delete.insert(manifest_file.manifest_path.clone());
61 }
62 }
63
64 if metadata.table_properties()?.gc_enabled {
66 delete_data_files(io, &manifests_to_delete).await?;
67 }
68
69 io.delete_stream(stream::iter(manifests_to_delete)).await?;
71
72 io.delete_stream(stream::iter(manifest_lists_to_delete))
74 .await?;
75
76 let prev_metadata_paths: Vec<String> = metadata
78 .metadata_log()
79 .iter()
80 .map(|m| m.metadata_file.clone())
81 .collect();
82 io.delete_stream(stream::iter(prev_metadata_paths)).await?;
83
84 let stats_paths: Vec<String> = metadata
86 .statistics_iter()
87 .map(|s| s.statistics_path.clone())
88 .collect();
89 io.delete_stream(stream::iter(stats_paths)).await?;
90
91 let partition_stats_paths: Vec<String> = metadata
93 .partition_statistics_iter()
94 .map(|s| s.statistics_path.clone())
95 .collect();
96 io.delete_stream(stream::iter(partition_stats_paths))
97 .await?;
98
99 if let Some(location) = metadata_location {
101 io.delete(location).await?;
102 }
103
104 Ok(())
105}
106
107async fn delete_data_files(io: &FileIO, manifest_paths: &HashSet<String>) -> Result<()> {
109 stream::iter(manifest_paths.iter().map(Ok))
110 .try_for_each_concurrent(DELETE_CONCURRENCY, |manifest_path| async move {
111 let input = io.new_input(manifest_path)?;
112 let manifest_content = input.read().await?;
113 let manifest = crate::spec::Manifest::parse_avro(&manifest_content)?;
114
115 let data_file_paths = manifest
116 .entries()
117 .iter()
118 .map(|entry| entry.data_file.file_path().to_string())
119 .collect::<Vec<_>>();
120
121 io.delete_stream(stream::iter(data_file_paths)).await
122 })
123 .await
124}