iceberg/
delete_file_index.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::collections::HashMap;
19use std::ops::Deref;
20use std::sync::{Arc, RwLock};
21
22use futures::StreamExt;
23use futures::channel::mpsc::{Sender, channel};
24use tokio::sync::Notify;
25
26use crate::runtime::spawn;
27use crate::scan::{DeleteFileContext, FileScanTaskDeleteFile};
28use crate::spec::{DataContentType, DataFile, Struct};
29
30/// Index of delete files
31#[derive(Debug, Clone)]
32pub(crate) struct DeleteFileIndex {
33    state: Arc<RwLock<DeleteFileIndexState>>,
34}
35
36#[derive(Debug)]
37enum DeleteFileIndexState {
38    Populating(Arc<Notify>),
39    Populated(PopulatedDeleteFileIndex),
40}
41
42#[derive(Debug)]
43struct PopulatedDeleteFileIndex {
44    #[allow(dead_code)]
45    global_equality_deletes: Vec<Arc<DeleteFileContext>>,
46    eq_deletes_by_partition: HashMap<Struct, Vec<Arc<DeleteFileContext>>>,
47    pos_deletes_by_partition: HashMap<Struct, Vec<Arc<DeleteFileContext>>>,
48    // TODO: do we need this?
49    // pos_deletes_by_path: HashMap<String, Vec<Arc<DeleteFileContext>>>,
50
51    // TODO: Deletion Vector support
52}
53
54impl DeleteFileIndex {
55    /// create a new `DeleteFileIndex` along with the sender that populates it with delete files
56    pub(crate) fn new() -> (DeleteFileIndex, Sender<DeleteFileContext>) {
57        // TODO: what should the channel limit be?
58        let (tx, rx) = channel(10);
59        let notify = Arc::new(Notify::new());
60        let state = Arc::new(RwLock::new(DeleteFileIndexState::Populating(
61            notify.clone(),
62        )));
63        let delete_file_stream = rx.boxed();
64
65        spawn({
66            let state = state.clone();
67            async move {
68                let delete_files: Vec<DeleteFileContext> =
69                    delete_file_stream.collect::<Vec<_>>().await;
70
71                let populated_delete_file_index = PopulatedDeleteFileIndex::new(delete_files);
72
73                {
74                    let mut guard = state.write().unwrap();
75                    *guard = DeleteFileIndexState::Populated(populated_delete_file_index);
76                }
77                notify.notify_waiters();
78            }
79        });
80
81        (DeleteFileIndex { state }, tx)
82    }
83
84    /// Gets all the delete files that apply to the specified data file.
85    pub(crate) async fn get_deletes_for_data_file(
86        &self,
87        data_file: &DataFile,
88        seq_num: Option<i64>,
89    ) -> Vec<FileScanTaskDeleteFile> {
90        let notifier = {
91            let guard = self.state.read().unwrap();
92            match *guard {
93                DeleteFileIndexState::Populating(ref notifier) => notifier.clone(),
94                DeleteFileIndexState::Populated(ref index) => {
95                    return index.get_deletes_for_data_file(data_file, seq_num);
96                }
97            }
98        };
99
100        notifier.notified().await;
101
102        let guard = self.state.read().unwrap();
103        match guard.deref() {
104            DeleteFileIndexState::Populated(index) => {
105                index.get_deletes_for_data_file(data_file, seq_num)
106            }
107            _ => unreachable!("Cannot be any other state than loaded"),
108        }
109    }
110}
111
112impl PopulatedDeleteFileIndex {
113    /// Creates a new populated delete file index from a list of delete file contexts, which
114    /// allows for fast lookup when determining which delete files apply to a given data file.
115    ///
116    /// 1. The partition information is extracted from each delete file's manifest entry.
117    /// 2. If the partition is empty and the delete file is not a positional delete,
118    ///    it is added to the `global_equality_deletes` vector
119    /// 3. Otherwise, the delete file is added to one of two hash maps based on its content type.
120    fn new(files: Vec<DeleteFileContext>) -> PopulatedDeleteFileIndex {
121        let mut eq_deletes_by_partition: HashMap<Struct, Vec<Arc<DeleteFileContext>>> =
122            HashMap::default();
123        let mut pos_deletes_by_partition: HashMap<Struct, Vec<Arc<DeleteFileContext>>> =
124            HashMap::default();
125
126        let mut global_equality_deletes: Vec<Arc<DeleteFileContext>> = vec![];
127
128        files.into_iter().for_each(|ctx| {
129            let arc_ctx = Arc::new(ctx);
130
131            let partition = arc_ctx.manifest_entry.data_file().partition();
132
133            // The spec states that "Equality delete files stored with an unpartitioned spec are applied as global deletes".
134            if partition.fields().is_empty() {
135                // TODO: confirm we're good to skip here if we encounter a pos del
136                if arc_ctx.manifest_entry.content_type() != DataContentType::PositionDeletes {
137                    global_equality_deletes.push(arc_ctx);
138                    return;
139                }
140            }
141
142            let destination_map = match arc_ctx.manifest_entry.content_type() {
143                DataContentType::PositionDeletes => &mut pos_deletes_by_partition,
144                DataContentType::EqualityDeletes => &mut eq_deletes_by_partition,
145                _ => unreachable!(),
146            };
147
148            destination_map
149                .entry(partition.clone())
150                .and_modify(|entry| {
151                    entry.push(arc_ctx.clone());
152                })
153                .or_insert(vec![arc_ctx.clone()]);
154        });
155
156        PopulatedDeleteFileIndex {
157            global_equality_deletes,
158            eq_deletes_by_partition,
159            pos_deletes_by_partition,
160        }
161    }
162
163    /// Determine all the delete files that apply to the provided `DataFile`.
164    fn get_deletes_for_data_file(
165        &self,
166        data_file: &DataFile,
167        seq_num: Option<i64>,
168    ) -> Vec<FileScanTaskDeleteFile> {
169        let mut results = vec![];
170
171        self.global_equality_deletes
172            .iter()
173            // filter that returns true if the provided delete file's sequence number is **greater than** `seq_num`
174            .filter(|&delete| {
175                seq_num
176                    .map(|seq_num| delete.manifest_entry.sequence_number() > Some(seq_num))
177                    .unwrap_or_else(|| true)
178            })
179            .for_each(|delete| results.push(delete.as_ref().into()));
180
181        if let Some(deletes) = self.eq_deletes_by_partition.get(data_file.partition()) {
182            deletes
183                .iter()
184                // filter that returns true if the provided delete file's sequence number is **greater than** `seq_num`
185                .filter(|&delete| {
186                    seq_num
187                        .map(|seq_num| delete.manifest_entry.sequence_number() > Some(seq_num))
188                        .unwrap_or_else(|| true)
189                        && data_file.partition_spec_id == delete.partition_spec_id
190                })
191                .for_each(|delete| results.push(delete.as_ref().into()));
192        }
193
194        // TODO: the spec states that:
195        //     "The data file's file_path is equal to the delete file's referenced_data_file if it is non-null".
196        //     we're not yet doing that here. The referenced data file's name will also be present in the positional
197        //     delete file's file path column.
198        if let Some(deletes) = self.pos_deletes_by_partition.get(data_file.partition()) {
199            deletes
200                .iter()
201                // filter that returns true if the provided delete file's sequence number is **greater than or equal to** `seq_num`
202                .filter(|&delete| {
203                    seq_num
204                        .map(|seq_num| delete.manifest_entry.sequence_number() >= Some(seq_num))
205                        .unwrap_or_else(|| true)
206                        && data_file.partition_spec_id == delete.partition_spec_id
207                })
208                .for_each(|delete| results.push(delete.as_ref().into()));
209        }
210
211        results
212    }
213}
214
215#[cfg(test)]
216mod tests {
217    use uuid::Uuid;
218
219    use super::*;
220    use crate::spec::{
221        DataContentType, DataFileBuilder, DataFileFormat, Literal, ManifestEntry, ManifestStatus,
222        Struct,
223    };
224
225    #[test]
226    fn test_delete_file_index_unpartitioned() {
227        let deletes: Vec<ManifestEntry> = vec![
228            build_added_manifest_entry(4, &build_unpartitioned_eq_delete()),
229            build_added_manifest_entry(6, &build_unpartitioned_eq_delete()),
230            build_added_manifest_entry(5, &build_unpartitioned_pos_delete()),
231            build_added_manifest_entry(6, &build_unpartitioned_pos_delete()),
232        ];
233
234        let delete_file_paths: Vec<String> = deletes
235            .iter()
236            .map(|file| file.file_path().to_string())
237            .collect();
238
239        let delete_contexts: Vec<DeleteFileContext> = deletes
240            .into_iter()
241            .map(|entry| DeleteFileContext {
242                manifest_entry: entry.into(),
243                partition_spec_id: 0,
244            })
245            .collect();
246
247        let delete_file_index = PopulatedDeleteFileIndex::new(delete_contexts);
248
249        let data_file = build_unpartitioned_data_file();
250
251        // All deletes apply to sequence 0
252        let delete_files_to_apply_for_seq_0 =
253            delete_file_index.get_deletes_for_data_file(&data_file, Some(0));
254        assert_eq!(delete_files_to_apply_for_seq_0.len(), 4);
255
256        // All deletes apply to sequence 3
257        let delete_files_to_apply_for_seq_3 =
258            delete_file_index.get_deletes_for_data_file(&data_file, Some(3));
259        assert_eq!(delete_files_to_apply_for_seq_3.len(), 4);
260
261        // Last 3 deletes apply to sequence 4
262        let delete_files_to_apply_for_seq_4 =
263            delete_file_index.get_deletes_for_data_file(&data_file, Some(4));
264        let actual_paths_to_apply_for_seq_4: Vec<String> = delete_files_to_apply_for_seq_4
265            .into_iter()
266            .map(|file| file.file_path)
267            .collect();
268
269        assert_eq!(
270            actual_paths_to_apply_for_seq_4,
271            delete_file_paths[delete_file_paths.len() - 3..]
272        );
273
274        // Last 3 deletes apply to sequence 5
275        let delete_files_to_apply_for_seq_5 =
276            delete_file_index.get_deletes_for_data_file(&data_file, Some(5));
277        let actual_paths_to_apply_for_seq_5: Vec<String> = delete_files_to_apply_for_seq_5
278            .into_iter()
279            .map(|file| file.file_path)
280            .collect();
281        assert_eq!(
282            actual_paths_to_apply_for_seq_5,
283            delete_file_paths[delete_file_paths.len() - 3..]
284        );
285
286        // Only the last position delete applies to sequence 6
287        let delete_files_to_apply_for_seq_6 =
288            delete_file_index.get_deletes_for_data_file(&data_file, Some(6));
289        let actual_paths_to_apply_for_seq_6: Vec<String> = delete_files_to_apply_for_seq_6
290            .into_iter()
291            .map(|file| file.file_path)
292            .collect();
293        assert_eq!(
294            actual_paths_to_apply_for_seq_6,
295            delete_file_paths[delete_file_paths.len() - 1..]
296        );
297
298        // The 2 global equality deletes should match against any partitioned file
299        let partitioned_file =
300            build_partitioned_data_file(&Struct::from_iter([Some(Literal::long(100))]), 1);
301
302        let delete_files_to_apply_for_partitioned_file =
303            delete_file_index.get_deletes_for_data_file(&partitioned_file, Some(0));
304        let actual_paths_to_apply_for_partitioned_file: Vec<String> =
305            delete_files_to_apply_for_partitioned_file
306                .into_iter()
307                .map(|file| file.file_path)
308                .collect();
309        assert_eq!(
310            actual_paths_to_apply_for_partitioned_file,
311            delete_file_paths[..2]
312        );
313    }
314
315    #[test]
316    fn test_delete_file_index_partitioned() {
317        let partition_one = Struct::from_iter([Some(Literal::long(100))]);
318        let spec_id = 1;
319        let deletes: Vec<ManifestEntry> = vec![
320            build_added_manifest_entry(4, &build_partitioned_eq_delete(&partition_one, spec_id)),
321            build_added_manifest_entry(6, &build_partitioned_eq_delete(&partition_one, spec_id)),
322            build_added_manifest_entry(5, &build_partitioned_pos_delete(&partition_one, spec_id)),
323            build_added_manifest_entry(6, &build_partitioned_pos_delete(&partition_one, spec_id)),
324        ];
325
326        let delete_file_paths: Vec<String> = deletes
327            .iter()
328            .map(|file| file.file_path().to_string())
329            .collect();
330
331        let delete_contexts: Vec<DeleteFileContext> = deletes
332            .into_iter()
333            .map(|entry| DeleteFileContext {
334                manifest_entry: entry.into(),
335                partition_spec_id: spec_id,
336            })
337            .collect();
338
339        let delete_file_index = PopulatedDeleteFileIndex::new(delete_contexts);
340
341        let partitioned_file =
342            build_partitioned_data_file(&Struct::from_iter([Some(Literal::long(100))]), spec_id);
343
344        // All deletes apply to sequence 0
345        let delete_files_to_apply_for_seq_0 =
346            delete_file_index.get_deletes_for_data_file(&partitioned_file, Some(0));
347        assert_eq!(delete_files_to_apply_for_seq_0.len(), 4);
348
349        // All deletes apply to sequence 3
350        let delete_files_to_apply_for_seq_3 =
351            delete_file_index.get_deletes_for_data_file(&partitioned_file, Some(3));
352        assert_eq!(delete_files_to_apply_for_seq_3.len(), 4);
353
354        // Last 3 deletes apply to sequence 4
355        let delete_files_to_apply_for_seq_4 =
356            delete_file_index.get_deletes_for_data_file(&partitioned_file, Some(4));
357        let actual_paths_to_apply_for_seq_4: Vec<String> = delete_files_to_apply_for_seq_4
358            .into_iter()
359            .map(|file| file.file_path)
360            .collect();
361
362        assert_eq!(
363            actual_paths_to_apply_for_seq_4,
364            delete_file_paths[delete_file_paths.len() - 3..]
365        );
366
367        // Last 3 deletes apply to sequence 5
368        let delete_files_to_apply_for_seq_5 =
369            delete_file_index.get_deletes_for_data_file(&partitioned_file, Some(5));
370        let actual_paths_to_apply_for_seq_5: Vec<String> = delete_files_to_apply_for_seq_5
371            .into_iter()
372            .map(|file| file.file_path)
373            .collect();
374        assert_eq!(
375            actual_paths_to_apply_for_seq_5,
376            delete_file_paths[delete_file_paths.len() - 3..]
377        );
378
379        // Only the last position delete applies to sequence 6
380        let delete_files_to_apply_for_seq_6 =
381            delete_file_index.get_deletes_for_data_file(&partitioned_file, Some(6));
382        let actual_paths_to_apply_for_seq_6: Vec<String> = delete_files_to_apply_for_seq_6
383            .into_iter()
384            .map(|file| file.file_path)
385            .collect();
386        assert_eq!(
387            actual_paths_to_apply_for_seq_6,
388            delete_file_paths[delete_file_paths.len() - 1..]
389        );
390
391        // Data file with different partition tuples does not match any delete files
392        let partitioned_second_file =
393            build_partitioned_data_file(&Struct::from_iter([Some(Literal::long(200))]), 1);
394        let delete_files_to_apply_for_different_partition =
395            delete_file_index.get_deletes_for_data_file(&partitioned_second_file, Some(0));
396        let actual_paths_to_apply_for_different_partition: Vec<String> =
397            delete_files_to_apply_for_different_partition
398                .into_iter()
399                .map(|file| file.file_path)
400                .collect();
401        assert!(actual_paths_to_apply_for_different_partition.is_empty());
402
403        // Data file with same tuple but different spec ID does not match any delete files
404        let partitioned_different_spec = build_partitioned_data_file(&partition_one, 2);
405        let delete_files_to_apply_for_different_spec =
406            delete_file_index.get_deletes_for_data_file(&partitioned_different_spec, Some(0));
407        let actual_paths_to_apply_for_different_spec: Vec<String> =
408            delete_files_to_apply_for_different_spec
409                .into_iter()
410                .map(|file| file.file_path)
411                .collect();
412        assert!(actual_paths_to_apply_for_different_spec.is_empty());
413    }
414
415    fn build_unpartitioned_eq_delete() -> DataFile {
416        build_partitioned_eq_delete(&Struct::empty(), 0)
417    }
418
419    fn build_partitioned_eq_delete(partition: &Struct, spec_id: i32) -> DataFile {
420        DataFileBuilder::default()
421            .file_path(format!("{}_equality_delete.parquet", Uuid::new_v4()))
422            .file_format(DataFileFormat::Parquet)
423            .content(DataContentType::EqualityDeletes)
424            .equality_ids(Some(vec![1]))
425            .record_count(1)
426            .partition(partition.clone())
427            .partition_spec_id(spec_id)
428            .file_size_in_bytes(100)
429            .build()
430            .unwrap()
431    }
432
433    fn build_unpartitioned_pos_delete() -> DataFile {
434        build_partitioned_pos_delete(&Struct::empty(), 0)
435    }
436
437    fn build_partitioned_pos_delete(partition: &Struct, spec_id: i32) -> DataFile {
438        DataFileBuilder::default()
439            .file_path(format!("{}-pos-delete.parquet", Uuid::new_v4()))
440            .file_format(DataFileFormat::Parquet)
441            .content(DataContentType::PositionDeletes)
442            .record_count(1)
443            .referenced_data_file(Some("/some-data-file.parquet".to_string()))
444            .partition(partition.clone())
445            .partition_spec_id(spec_id)
446            .file_size_in_bytes(100)
447            .build()
448            .unwrap()
449    }
450
451    fn build_unpartitioned_data_file() -> DataFile {
452        DataFileBuilder::default()
453            .file_path(format!("{}-data.parquet", Uuid::new_v4()))
454            .file_format(DataFileFormat::Parquet)
455            .content(DataContentType::Data)
456            .record_count(100)
457            .partition(Struct::empty())
458            .partition_spec_id(0)
459            .file_size_in_bytes(100)
460            .build()
461            .unwrap()
462    }
463
464    fn build_partitioned_data_file(partition_value: &Struct, spec_id: i32) -> DataFile {
465        DataFileBuilder::default()
466            .file_path(format!("{}-data.parquet", Uuid::new_v4()))
467            .file_format(DataFileFormat::Parquet)
468            .content(DataContentType::Data)
469            .record_count(100)
470            .partition(partition_value.clone())
471            .partition_spec_id(spec_id)
472            .file_size_in_bytes(100)
473            .build()
474            .unwrap()
475    }
476
477    fn build_added_manifest_entry(data_seq_number: i64, file: &DataFile) -> ManifestEntry {
478        ManifestEntry::builder()
479            .status(ManifestStatus::Added)
480            .sequence_number(data_seq_number)
481            .data_file(file.clone())
482            .build()
483    }
484}