iceberg/
delete_vector.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::ops::BitOrAssign;
19
20use roaring::RoaringTreemap;
21use roaring::bitmap::Iter;
22use roaring::treemap::BitmapIter;
23
24use crate::{Error, ErrorKind, Result};
25
26#[derive(Debug, Default)]
27pub struct DeleteVector {
28    inner: RoaringTreemap,
29}
30
31impl DeleteVector {
32    #[allow(unused)]
33    pub fn new(roaring_treemap: RoaringTreemap) -> DeleteVector {
34        DeleteVector {
35            inner: roaring_treemap,
36        }
37    }
38
39    pub fn iter(&self) -> DeleteVectorIterator<'_> {
40        let outer = self.inner.bitmaps();
41        DeleteVectorIterator { outer, inner: None }
42    }
43
44    pub fn insert(&mut self, pos: u64) -> bool {
45        self.inner.insert(pos)
46    }
47
48    /// Marks the given `positions` as deleted and returns the number of elements appended.
49    ///
50    /// The input slice must be strictly ordered in ascending order, and every value must be greater than all existing values already in the set.
51    ///
52    /// # Errors
53    ///
54    /// Returns an error if the precondition is not met.
55    #[allow(dead_code)]
56    pub fn insert_positions(&mut self, positions: &[u64]) -> Result<usize> {
57        if let Err(err) = self.inner.append(positions.iter().copied()) {
58            return Err(Error::new(
59                ErrorKind::PreconditionFailed,
60                "failed to marks rows as deleted".to_string(),
61            )
62            .with_source(err));
63        }
64        Ok(positions.len())
65    }
66
67    #[allow(unused)]
68    pub fn len(&self) -> u64 {
69        self.inner.len()
70    }
71}
72
73// Ideally, we'd just wrap `roaring::RoaringTreemap`'s iterator, `roaring::treemap::Iter` here.
74// But right now, it does not have a corresponding implementation of `roaring::bitmap::Iter::advance_to`,
75// which is very handy in ArrowReader::build_deletes_row_selection.
76// There is a PR open on roaring to add this (https://github.com/RoaringBitmap/roaring-rs/pull/314)
77// and if that gets merged then we can simplify `DeleteVectorIterator` here, refactoring `advance_to`
78// to just a wrapper around the underlying iterator's method.
79pub struct DeleteVectorIterator<'a> {
80    // NB: `BitMapIter` was only exposed publicly in https://github.com/RoaringBitmap/roaring-rs/pull/316
81    // which is not yet released. As a consequence our Cargo.toml temporarily uses a git reference for
82    // the roaring dependency.
83    outer: BitmapIter<'a>,
84    inner: Option<DeleteVectorIteratorInner<'a>>,
85}
86
87struct DeleteVectorIteratorInner<'a> {
88    high_bits: u32,
89    bitmap_iter: Iter<'a>,
90}
91
92impl Iterator for DeleteVectorIterator<'_> {
93    type Item = u64;
94
95    fn next(&mut self) -> Option<Self::Item> {
96        if let Some(inner) = &mut self.inner
97            && let Some(inner_next) = inner.bitmap_iter.next()
98        {
99            return Some(u64::from(inner.high_bits) << 32 | u64::from(inner_next));
100        }
101
102        if let Some((high_bits, next_bitmap)) = self.outer.next() {
103            self.inner = Some(DeleteVectorIteratorInner {
104                high_bits,
105                bitmap_iter: next_bitmap.iter(),
106            })
107        } else {
108            return None;
109        }
110
111        self.next()
112    }
113}
114
115impl DeleteVectorIterator<'_> {
116    pub fn advance_to(&mut self, pos: u64) {
117        let hi = (pos >> 32) as u32;
118        let lo = pos as u32;
119
120        let Some(ref mut inner) = self.inner else {
121            return;
122        };
123
124        while inner.high_bits < hi {
125            let Some((next_hi, next_bitmap)) = self.outer.next() else {
126                return;
127            };
128
129            *inner = DeleteVectorIteratorInner {
130                high_bits: next_hi,
131                bitmap_iter: next_bitmap.iter(),
132            }
133        }
134
135        inner.bitmap_iter.advance_to(lo);
136    }
137}
138
139impl BitOrAssign for DeleteVector {
140    fn bitor_assign(&mut self, other: Self) {
141        self.inner.bitor_assign(&other.inner);
142    }
143}
144
145#[cfg(test)]
146mod tests {
147    use super::*;
148
149    #[test]
150    fn test_insertion_and_iteration() {
151        let mut dv = DeleteVector::default();
152        assert!(dv.insert(42));
153        assert!(dv.insert(100));
154        assert!(!dv.insert(42));
155
156        let mut items: Vec<u64> = dv.iter().collect();
157        items.sort();
158        assert_eq!(items, vec![42, 100]);
159        assert_eq!(dv.len(), 2);
160    }
161
162    #[test]
163    fn test_successful_insert_positions() {
164        let mut dv = DeleteVector::default();
165        let positions = vec![1, 2, 3, 1000, 1 << 33];
166        assert_eq!(dv.insert_positions(&positions).unwrap(), 5);
167
168        let mut collected: Vec<u64> = dv.iter().collect();
169        collected.sort();
170        assert_eq!(collected, positions);
171    }
172
173    /// Testing scenario: bulk insertion fails because input positions are not strictly increasing.
174    #[test]
175    fn test_failed_insertion_unsorted_elements() {
176        let mut dv = DeleteVector::default();
177        let positions = vec![1, 3, 5, 4];
178        let res = dv.insert_positions(&positions);
179        assert!(res.is_err());
180    }
181
182    /// Testing scenario: bulk insertion fails because input positions have intersection with existing ones.
183    #[test]
184    fn test_failed_insertion_with_intersection() {
185        let mut dv = DeleteVector::default();
186        let positions = vec![1, 3, 5];
187        assert_eq!(dv.insert_positions(&positions).unwrap(), 3);
188
189        let res = dv.insert_positions(&[2, 4]);
190        assert!(res.is_err());
191    }
192
193    /// Testing scenario: bulk insertion fails because input positions have duplicates.
194    #[test]
195    fn test_failed_insertion_duplicate_elements() {
196        let mut dv = DeleteVector::default();
197        let positions = vec![1, 3, 5, 5];
198        let res = dv.insert_positions(&positions);
199        assert!(res.is_err());
200    }
201}