1use std::collections::HashMap;
19use std::ops::Deref;
20use std::sync::{Arc, RwLock};
21
22use futures::StreamExt;
23use futures::channel::mpsc::{Sender, channel};
24use tokio::sync::Notify;
25
26use crate::runtime::spawn;
27use crate::scan::{DeleteFileContext, FileScanTaskDeleteFile};
28use crate::spec::{DataContentType, DataFile, Struct};
29
30#[derive(Debug, Clone)]
32pub(crate) struct DeleteFileIndex {
33 state: Arc<RwLock<DeleteFileIndexState>>,
34}
35
36#[derive(Debug)]
37enum DeleteFileIndexState {
38 Populating(Arc<Notify>),
39 Populated(PopulatedDeleteFileIndex),
40}
41
42#[derive(Debug)]
43struct PopulatedDeleteFileIndex {
44 #[allow(dead_code)]
45 global_equality_deletes: Vec<Arc<DeleteFileContext>>,
46 eq_deletes_by_partition: HashMap<Struct, Vec<Arc<DeleteFileContext>>>,
47 pos_deletes_by_partition: HashMap<Struct, Vec<Arc<DeleteFileContext>>>,
48 }
53
54impl DeleteFileIndex {
55 pub(crate) fn new() -> (DeleteFileIndex, Sender<DeleteFileContext>) {
57 let (tx, rx) = channel(10);
59 let notify = Arc::new(Notify::new());
60 let state = Arc::new(RwLock::new(DeleteFileIndexState::Populating(
61 notify.clone(),
62 )));
63 let delete_file_stream = rx.boxed();
64
65 spawn({
66 let state = state.clone();
67 async move {
68 let delete_files: Vec<DeleteFileContext> =
69 delete_file_stream.collect::<Vec<_>>().await;
70
71 let populated_delete_file_index = PopulatedDeleteFileIndex::new(delete_files);
72
73 {
74 let mut guard = state.write().unwrap();
75 *guard = DeleteFileIndexState::Populated(populated_delete_file_index);
76 }
77 notify.notify_waiters();
78 }
79 });
80
81 (DeleteFileIndex { state }, tx)
82 }
83
84 pub(crate) async fn get_deletes_for_data_file(
86 &self,
87 data_file: &DataFile,
88 seq_num: Option<i64>,
89 ) -> Vec<FileScanTaskDeleteFile> {
90 let notifier = {
91 let guard = self.state.read().unwrap();
92 match *guard {
93 DeleteFileIndexState::Populating(ref notifier) => notifier.clone(),
94 DeleteFileIndexState::Populated(ref index) => {
95 return index.get_deletes_for_data_file(data_file, seq_num);
96 }
97 }
98 };
99
100 notifier.notified().await;
101
102 let guard = self.state.read().unwrap();
103 match guard.deref() {
104 DeleteFileIndexState::Populated(index) => {
105 index.get_deletes_for_data_file(data_file, seq_num)
106 }
107 _ => unreachable!("Cannot be any other state than loaded"),
108 }
109 }
110}
111
112impl PopulatedDeleteFileIndex {
113 fn new(files: Vec<DeleteFileContext>) -> PopulatedDeleteFileIndex {
121 let mut eq_deletes_by_partition: HashMap<Struct, Vec<Arc<DeleteFileContext>>> =
122 HashMap::default();
123 let mut pos_deletes_by_partition: HashMap<Struct, Vec<Arc<DeleteFileContext>>> =
124 HashMap::default();
125
126 let mut global_equality_deletes: Vec<Arc<DeleteFileContext>> = vec![];
127
128 files.into_iter().for_each(|ctx| {
129 let arc_ctx = Arc::new(ctx);
130
131 let partition = arc_ctx.manifest_entry.data_file().partition();
132
133 if partition.fields().is_empty() {
135 if arc_ctx.manifest_entry.content_type() != DataContentType::PositionDeletes {
137 global_equality_deletes.push(arc_ctx);
138 return;
139 }
140 }
141
142 let destination_map = match arc_ctx.manifest_entry.content_type() {
143 DataContentType::PositionDeletes => &mut pos_deletes_by_partition,
144 DataContentType::EqualityDeletes => &mut eq_deletes_by_partition,
145 _ => unreachable!(),
146 };
147
148 destination_map
149 .entry(partition.clone())
150 .and_modify(|entry| {
151 entry.push(arc_ctx.clone());
152 })
153 .or_insert(vec![arc_ctx.clone()]);
154 });
155
156 PopulatedDeleteFileIndex {
157 global_equality_deletes,
158 eq_deletes_by_partition,
159 pos_deletes_by_partition,
160 }
161 }
162
163 fn get_deletes_for_data_file(
165 &self,
166 data_file: &DataFile,
167 seq_num: Option<i64>,
168 ) -> Vec<FileScanTaskDeleteFile> {
169 let mut results = vec![];
170
171 self.global_equality_deletes
172 .iter()
173 .filter(|&delete| {
175 seq_num
176 .map(|seq_num| delete.manifest_entry.sequence_number() > Some(seq_num))
177 .unwrap_or_else(|| true)
178 })
179 .for_each(|delete| results.push(delete.as_ref().into()));
180
181 if let Some(deletes) = self.eq_deletes_by_partition.get(data_file.partition()) {
182 deletes
183 .iter()
184 .filter(|&delete| {
186 seq_num
187 .map(|seq_num| delete.manifest_entry.sequence_number() > Some(seq_num))
188 .unwrap_or_else(|| true)
189 && data_file.partition_spec_id == delete.partition_spec_id
190 })
191 .for_each(|delete| results.push(delete.as_ref().into()));
192 }
193
194 if let Some(deletes) = self.pos_deletes_by_partition.get(data_file.partition()) {
199 deletes
200 .iter()
201 .filter(|&delete| {
203 seq_num
204 .map(|seq_num| delete.manifest_entry.sequence_number() >= Some(seq_num))
205 .unwrap_or_else(|| true)
206 && data_file.partition_spec_id == delete.partition_spec_id
207 })
208 .for_each(|delete| results.push(delete.as_ref().into()));
209 }
210
211 results
212 }
213}
214
215#[cfg(test)]
216mod tests {
217 use uuid::Uuid;
218
219 use super::*;
220 use crate::spec::{
221 DataContentType, DataFileBuilder, DataFileFormat, Literal, ManifestEntry, ManifestStatus,
222 Struct,
223 };
224
225 #[test]
226 fn test_delete_file_index_unpartitioned() {
227 let deletes: Vec<ManifestEntry> = vec![
228 build_added_manifest_entry(4, &build_unpartitioned_eq_delete()),
229 build_added_manifest_entry(6, &build_unpartitioned_eq_delete()),
230 build_added_manifest_entry(5, &build_unpartitioned_pos_delete()),
231 build_added_manifest_entry(6, &build_unpartitioned_pos_delete()),
232 ];
233
234 let delete_file_paths: Vec<String> = deletes
235 .iter()
236 .map(|file| file.file_path().to_string())
237 .collect();
238
239 let delete_contexts: Vec<DeleteFileContext> = deletes
240 .into_iter()
241 .map(|entry| DeleteFileContext {
242 manifest_entry: entry.into(),
243 partition_spec_id: 0,
244 })
245 .collect();
246
247 let delete_file_index = PopulatedDeleteFileIndex::new(delete_contexts);
248
249 let data_file = build_unpartitioned_data_file();
250
251 let delete_files_to_apply_for_seq_0 =
253 delete_file_index.get_deletes_for_data_file(&data_file, Some(0));
254 assert_eq!(delete_files_to_apply_for_seq_0.len(), 4);
255
256 let delete_files_to_apply_for_seq_3 =
258 delete_file_index.get_deletes_for_data_file(&data_file, Some(3));
259 assert_eq!(delete_files_to_apply_for_seq_3.len(), 4);
260
261 let delete_files_to_apply_for_seq_4 =
263 delete_file_index.get_deletes_for_data_file(&data_file, Some(4));
264 let actual_paths_to_apply_for_seq_4: Vec<String> = delete_files_to_apply_for_seq_4
265 .into_iter()
266 .map(|file| file.file_path)
267 .collect();
268
269 assert_eq!(
270 actual_paths_to_apply_for_seq_4,
271 delete_file_paths[delete_file_paths.len() - 3..]
272 );
273
274 let delete_files_to_apply_for_seq_5 =
276 delete_file_index.get_deletes_for_data_file(&data_file, Some(5));
277 let actual_paths_to_apply_for_seq_5: Vec<String> = delete_files_to_apply_for_seq_5
278 .into_iter()
279 .map(|file| file.file_path)
280 .collect();
281 assert_eq!(
282 actual_paths_to_apply_for_seq_5,
283 delete_file_paths[delete_file_paths.len() - 3..]
284 );
285
286 let delete_files_to_apply_for_seq_6 =
288 delete_file_index.get_deletes_for_data_file(&data_file, Some(6));
289 let actual_paths_to_apply_for_seq_6: Vec<String> = delete_files_to_apply_for_seq_6
290 .into_iter()
291 .map(|file| file.file_path)
292 .collect();
293 assert_eq!(
294 actual_paths_to_apply_for_seq_6,
295 delete_file_paths[delete_file_paths.len() - 1..]
296 );
297
298 let partitioned_file =
300 build_partitioned_data_file(&Struct::from_iter([Some(Literal::long(100))]), 1);
301
302 let delete_files_to_apply_for_partitioned_file =
303 delete_file_index.get_deletes_for_data_file(&partitioned_file, Some(0));
304 let actual_paths_to_apply_for_partitioned_file: Vec<String> =
305 delete_files_to_apply_for_partitioned_file
306 .into_iter()
307 .map(|file| file.file_path)
308 .collect();
309 assert_eq!(
310 actual_paths_to_apply_for_partitioned_file,
311 delete_file_paths[..2]
312 );
313 }
314
315 #[test]
316 fn test_delete_file_index_partitioned() {
317 let partition_one = Struct::from_iter([Some(Literal::long(100))]);
318 let spec_id = 1;
319 let deletes: Vec<ManifestEntry> = vec![
320 build_added_manifest_entry(4, &build_partitioned_eq_delete(&partition_one, spec_id)),
321 build_added_manifest_entry(6, &build_partitioned_eq_delete(&partition_one, spec_id)),
322 build_added_manifest_entry(5, &build_partitioned_pos_delete(&partition_one, spec_id)),
323 build_added_manifest_entry(6, &build_partitioned_pos_delete(&partition_one, spec_id)),
324 ];
325
326 let delete_file_paths: Vec<String> = deletes
327 .iter()
328 .map(|file| file.file_path().to_string())
329 .collect();
330
331 let delete_contexts: Vec<DeleteFileContext> = deletes
332 .into_iter()
333 .map(|entry| DeleteFileContext {
334 manifest_entry: entry.into(),
335 partition_spec_id: spec_id,
336 })
337 .collect();
338
339 let delete_file_index = PopulatedDeleteFileIndex::new(delete_contexts);
340
341 let partitioned_file =
342 build_partitioned_data_file(&Struct::from_iter([Some(Literal::long(100))]), spec_id);
343
344 let delete_files_to_apply_for_seq_0 =
346 delete_file_index.get_deletes_for_data_file(&partitioned_file, Some(0));
347 assert_eq!(delete_files_to_apply_for_seq_0.len(), 4);
348
349 let delete_files_to_apply_for_seq_3 =
351 delete_file_index.get_deletes_for_data_file(&partitioned_file, Some(3));
352 assert_eq!(delete_files_to_apply_for_seq_3.len(), 4);
353
354 let delete_files_to_apply_for_seq_4 =
356 delete_file_index.get_deletes_for_data_file(&partitioned_file, Some(4));
357 let actual_paths_to_apply_for_seq_4: Vec<String> = delete_files_to_apply_for_seq_4
358 .into_iter()
359 .map(|file| file.file_path)
360 .collect();
361
362 assert_eq!(
363 actual_paths_to_apply_for_seq_4,
364 delete_file_paths[delete_file_paths.len() - 3..]
365 );
366
367 let delete_files_to_apply_for_seq_5 =
369 delete_file_index.get_deletes_for_data_file(&partitioned_file, Some(5));
370 let actual_paths_to_apply_for_seq_5: Vec<String> = delete_files_to_apply_for_seq_5
371 .into_iter()
372 .map(|file| file.file_path)
373 .collect();
374 assert_eq!(
375 actual_paths_to_apply_for_seq_5,
376 delete_file_paths[delete_file_paths.len() - 3..]
377 );
378
379 let delete_files_to_apply_for_seq_6 =
381 delete_file_index.get_deletes_for_data_file(&partitioned_file, Some(6));
382 let actual_paths_to_apply_for_seq_6: Vec<String> = delete_files_to_apply_for_seq_6
383 .into_iter()
384 .map(|file| file.file_path)
385 .collect();
386 assert_eq!(
387 actual_paths_to_apply_for_seq_6,
388 delete_file_paths[delete_file_paths.len() - 1..]
389 );
390
391 let partitioned_second_file =
393 build_partitioned_data_file(&Struct::from_iter([Some(Literal::long(200))]), 1);
394 let delete_files_to_apply_for_different_partition =
395 delete_file_index.get_deletes_for_data_file(&partitioned_second_file, Some(0));
396 let actual_paths_to_apply_for_different_partition: Vec<String> =
397 delete_files_to_apply_for_different_partition
398 .into_iter()
399 .map(|file| file.file_path)
400 .collect();
401 assert!(actual_paths_to_apply_for_different_partition.is_empty());
402
403 let partitioned_different_spec = build_partitioned_data_file(&partition_one, 2);
405 let delete_files_to_apply_for_different_spec =
406 delete_file_index.get_deletes_for_data_file(&partitioned_different_spec, Some(0));
407 let actual_paths_to_apply_for_different_spec: Vec<String> =
408 delete_files_to_apply_for_different_spec
409 .into_iter()
410 .map(|file| file.file_path)
411 .collect();
412 assert!(actual_paths_to_apply_for_different_spec.is_empty());
413 }
414
415 fn build_unpartitioned_eq_delete() -> DataFile {
416 build_partitioned_eq_delete(&Struct::empty(), 0)
417 }
418
419 fn build_partitioned_eq_delete(partition: &Struct, spec_id: i32) -> DataFile {
420 DataFileBuilder::default()
421 .file_path(format!("{}_equality_delete.parquet", Uuid::new_v4()))
422 .file_format(DataFileFormat::Parquet)
423 .content(DataContentType::EqualityDeletes)
424 .equality_ids(Some(vec![1]))
425 .record_count(1)
426 .partition(partition.clone())
427 .partition_spec_id(spec_id)
428 .file_size_in_bytes(100)
429 .build()
430 .unwrap()
431 }
432
433 fn build_unpartitioned_pos_delete() -> DataFile {
434 build_partitioned_pos_delete(&Struct::empty(), 0)
435 }
436
437 fn build_partitioned_pos_delete(partition: &Struct, spec_id: i32) -> DataFile {
438 DataFileBuilder::default()
439 .file_path(format!("{}-pos-delete.parquet", Uuid::new_v4()))
440 .file_format(DataFileFormat::Parquet)
441 .content(DataContentType::PositionDeletes)
442 .record_count(1)
443 .referenced_data_file(Some("/some-data-file.parquet".to_string()))
444 .partition(partition.clone())
445 .partition_spec_id(spec_id)
446 .file_size_in_bytes(100)
447 .build()
448 .unwrap()
449 }
450
451 fn build_unpartitioned_data_file() -> DataFile {
452 DataFileBuilder::default()
453 .file_path(format!("{}-data.parquet", Uuid::new_v4()))
454 .file_format(DataFileFormat::Parquet)
455 .content(DataContentType::Data)
456 .record_count(100)
457 .partition(Struct::empty())
458 .partition_spec_id(0)
459 .file_size_in_bytes(100)
460 .build()
461 .unwrap()
462 }
463
464 fn build_partitioned_data_file(partition_value: &Struct, spec_id: i32) -> DataFile {
465 DataFileBuilder::default()
466 .file_path(format!("{}-data.parquet", Uuid::new_v4()))
467 .file_format(DataFileFormat::Parquet)
468 .content(DataContentType::Data)
469 .record_count(100)
470 .partition(partition_value.clone())
471 .partition_spec_id(spec_id)
472 .file_size_in_bytes(100)
473 .build()
474 .unwrap()
475 }
476
477 fn build_added_manifest_entry(data_seq_number: i64, file: &DataFile) -> ManifestEntry {
478 ManifestEntry::builder()
479 .status(ManifestStatus::Added)
480 .sequence_number(data_seq_number)
481 .data_file(file.clone())
482 .build()
483 }
484}