Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: handle deletions in take #3360

Merged
merged 5 commits into from
Jan 9, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 91 additions & 1 deletion rust/lance-core/src/utils/deletion.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright The Lance Authors

use std::{collections::HashSet, ops::Range};
use std::{collections::HashSet, ops::Range, sync::Arc};

use arrow_array::BooleanArray;
use deepsize::{Context, DeepSizeOf};
@@ -60,6 +60,14 @@ impl DeletionVector {
}
}

fn range_cardinality(&self, range: Range<u32>) -> u64 {
match self {
Self::NoDeletions => 0,
Self::Set(set) => range.fold(0, |acc, i| acc + set.contains(&i) as u64),
Self::Bitmap(bitmap) => bitmap.range_cardinality(range),
}
}

pub fn iter(&self) -> Box<dyn Iterator<Item = u32> + Send + '_> {
match self {
Self::NoDeletions => Box::new(std::iter::empty()),
@@ -115,6 +123,64 @@ impl DeletionVector {
}
}

/// Maps a naive offset into a fragment to the local row offset that is
/// not deleted.
///
/// For example, if the deletion vector is [0, 1, 2], then the mapping
/// would be:
///
/// - 0 -> 3
/// - 1 -> 4
/// - 2 -> 5
///
/// and so on.
///
/// This expects a monotonically increasing sequence of input offsets. State
/// is re-used between calls to `map_offset` to make the mapping more efficient.
pub struct OffsetMapper {
dv: Arc<DeletionVector>,
left: u32,
last_diff: u32,
}

impl OffsetMapper {
pub fn new(dv: Arc<DeletionVector>) -> Self {
Self {
dv,
left: 0,
last_diff: 0,
}
}

pub fn map_offset(&mut self, offset: u32) -> u32 {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see we also have a judgment in fragement.rs here (but missed the judgment of discrete offset). Can we find one way to converge these judgments into one place?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I could probably re-implement that other function in terms of this one, if we wanted. Though not sure if it is worth while.

// The best initial guess is the offset + last diff. That's the right
// answer if there are no deletions in the range between the last
// offset and the current one.
let mut mid = offset + self.last_diff;
let mut right = offset + self.dv.len() as u32;
loop {
let deleted_in_range = self.dv.range_cardinality(0..(mid + 1)) as u32;
match mid.cmp(&(offset + deleted_in_range)) {
std::cmp::Ordering::Equal if !self.dv.contains(mid) => {
self.last_diff = mid - offset;
return mid;
}
std::cmp::Ordering::Less => {
assert_ne!(self.left, mid + 1);
self.left = mid + 1;
mid = self.left + (right - self.left) / 2;
}
// There are cases where the mid is deleted but also equal in
// comparison. For those we need to find a lower value.
std::cmp::Ordering::Greater | std::cmp::Ordering::Equal => {
right = mid;
mid = self.left + (right - self.left) / 2;
}
}
}
}
}

impl Default for DeletionVector {
fn default() -> Self {
Self::NoDeletions
@@ -241,4 +307,28 @@ mod test {
let dv = DeletionVector::from_iter(0..(BITMAP_THRESDHOLD as u32));
assert!(matches!(dv, DeletionVector::Bitmap(_)));
}

#[test]
fn test_map_offsets() {
let dv = DeletionVector::from_iter(vec![3, 5]);
let mut mapper = OffsetMapper::new(Arc::new(dv));

let offsets = [0, 1, 2, 3, 4, 5, 6];
let mut output = Vec::new();
for offset in offsets.iter() {
output.push(mapper.map_offset(*offset));
}
assert_eq!(output, vec![0, 1, 2, 4, 6, 7, 8]);

let dv = DeletionVector::from_iter(vec![0, 1, 2]);
let mut mapper = OffsetMapper::new(Arc::new(dv));

let offsets = [0, 1, 2, 3, 4, 5, 6];

let mut output = Vec::new();
for offset in offsets.iter() {
output.push(mapper.map_offset(*offset));
}
assert_eq!(output, vec![3, 4, 5, 6, 7, 8, 9]);
}
}
72 changes: 69 additions & 3 deletions rust/lance/src/dataset/take.rs
Original file line number Diff line number Diff line change
@@ -16,6 +16,7 @@ use futures::{Future, Stream, StreamExt, TryStreamExt};
use lance_arrow::RecordBatchExt;
use lance_core::datatypes::Schema;
use lance_core::utils::address::RowAddress;
use lance_core::utils::deletion::OffsetMapper;
use lance_core::ROW_ADDR;
use lance_datafusion::projection::ProjectionPlan;
use snafu::{location, Location};
@@ -49,9 +50,15 @@ pub async fn take(
} else {
0
};
let mut offset_mapper = if let Some(cur_frag) = cur_frag {
let deletion_vector = cur_frag.get_deletion_vector().await?;
deletion_vector.map(OffsetMapper::new)
} else {
None
};
let mut frag_offset = 0;

let mut addrs = Vec::with_capacity(sorted_offsets.len());
let mut addrs: Vec<u64> = Vec::with_capacity(sorted_offsets.len());
for sorted_offset in sorted_offsets.into_iter() {
while cur_frag.is_some() && sorted_offset >= frag_offset + cur_frag_rows {
frag_offset += cur_frag_rows;
@@ -61,13 +68,23 @@ pub async fn take(
} else {
0
};
offset_mapper = if let Some(cur_frag) = cur_frag {
let deletion_vector = cur_frag.get_deletion_vector().await?;
deletion_vector.map(OffsetMapper::new)
} else {
None
};
}
let Some(cur_frag) = cur_frag else {
addrs.push(RowAddress::TOMBSTONE_ROW);
continue;
};
let row_addr =
RowAddress::new_from_parts(cur_frag.id() as u32, (sorted_offset - frag_offset) as u32);

let mut local_offset = (sorted_offset - frag_offset) as u32;
if let Some(offset_mapper) = &mut offset_mapper {
local_offset = offset_mapper.map_offset(local_offset);
};
let row_addr = RowAddress::new_from_parts(cur_frag.id() as u32, local_offset);
addrs.push(u64::from(row_addr));
}

@@ -626,6 +643,55 @@ mod test {
);
}

#[tokio::test]
async fn test_take_with_deletion() {
let data = test_batch(0..120);
let write_params = WriteParams {
max_rows_per_file: 40,
max_rows_per_group: 10,
..Default::default()
};
let batches = RecordBatchIterator::new([Ok(data.clone())], data.schema());
let mut dataset = Dataset::write(batches, "memory://", Some(write_params))
.await
.unwrap();

dataset.delete("i in (40, 77, 78, 79)").await.unwrap();

let projection = Schema::try_from(data.schema().as_ref()).unwrap();
let values = dataset
.take(
&[
0, // 0
39, // 39
40, // 41
75, // 76
76, // 80
77, // 81
115, // 119
],
projection,
)
.await
.unwrap();

assert_eq!(
RecordBatch::try_new(
data.schema(),
vec![
Arc::new(Int32Array::from_iter_values([0, 39, 41, 76, 80, 81, 119])),
Arc::new(StringArray::from_iter_values(
[0, 39, 41, 76, 80, 81, 119]
.iter()
.map(|v| format!("str-{v}"))
)),
],
)
.unwrap(),
values
);
}

#[rstest]
#[tokio::test]
async fn test_take_with_projection(