Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make sure Arrow filter and take kernels early out where it makes sense #7704

Merged
merged 2 commits into from
Oct 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions crates/store/re_chunk/src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,28 @@ pub fn take_array<A: ArrowArray + Clone, O: arrow2::types::Index>(
"index arrays with validity bits are technically valid, but generally a sign that something went wrong",
);

if indices.len() == array.len() {
let indices = indices.values().as_slice();

let starts_at_zero = || indices[0] == O::zero();
let is_consecutive = || {
indices
.windows(2)
.all(|values| values[1] == values[0] + O::one())
};

if starts_at_zero() && is_consecutive() {
#[allow(clippy::unwrap_used)]
return array
.clone()
.as_any()
.downcast_ref::<A>()
// Unwrap: that's initial type that we got.
.unwrap()
.clone();
}
}

#[allow(clippy::unwrap_used)]
arrow2::compute::take::take(array, indices)
// Unwrap: this literally cannot fail.
Expand Down
130 changes: 129 additions & 1 deletion crates/store/re_chunk/tests/memory_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ use arrow2::{
},
offset::Offsets as ArrowOffsets,
};
use itertools::Itertools as _;
use itertools::Itertools;

#[test]
fn filter_does_allocate() {
Expand Down Expand Up @@ -125,6 +125,72 @@ fn filter_does_allocate() {
}
}

#[test]
fn filter_empty_or_full_is_noop() {
re_log::setup_logging();

const NUM_SCALARS: i64 = 10_000_000;

let (((unfiltered, unfiltered_size_bytes), (filtered, filtered_size_bytes)), total_size_bytes) =
memory_use(|| {
let unfiltered = memory_use(|| {
let scalars = ArrowPrimitiveArray::from_vec((0..NUM_SCALARS).collect_vec());
ArrowListArray::<i32>::new(
ArrowListArray::<i32>::default_datatype(scalars.data_type().clone()),
ArrowOffsets::try_from_lengths(
std::iter::repeat(NUM_SCALARS as usize / 10).take(10),
)
.unwrap()
.into(),
scalars.to_boxed(),
None,
)
});

let filter = ArrowBooleanArray::from_slice(
std::iter::repeat(true)
.take(unfiltered.0.len())
.collect_vec(),
);
let filtered = memory_use(|| re_chunk::util::filter_array(&unfiltered.0, &filter));

(unfiltered, filtered)
});

eprintln!(
"unfiltered={} filtered={} total={}",
re_format::format_bytes(unfiltered_size_bytes as _),
re_format::format_bytes(filtered_size_bytes as _),
re_format::format_bytes(total_size_bytes as _),
);

assert!(
filtered_size_bytes < 1_000,
"filtered array should be the size of a few empty datastructures at most"
);

{
let unfiltered = unfiltered
.values()
.as_any()
.downcast_ref::<ArrowPrimitiveArray<i64>>()
.unwrap();
let filtered = filtered
.values()
.as_any()
.downcast_ref::<ArrowPrimitiveArray<i64>>()
.unwrap();

assert!(
std::ptr::eq(
unfiltered.values().as_ptr_range().start,
filtered.values().as_ptr_range().start
),
"whole thing should be a noop -- pointers should match"
);
}
}

#[test]
// TODO(cmc): That's the end goal, but it is simply impossible with `ListArray`'s encoding.
// See `Chunk::take_array`'s doc-comment for more information.
Expand Down Expand Up @@ -191,3 +257,65 @@ fn take_does_not_allocate() {
);
}
}

#[test]
fn take_empty_or_full_is_noop() {
re_log::setup_logging();

const NUM_SCALARS: i64 = 10_000_000;

let (((untaken, untaken_size_bytes), (taken, taken_size_bytes)), total_size_bytes) =
memory_use(|| {
let untaken = memory_use(|| {
let scalars = ArrowPrimitiveArray::from_vec((0..NUM_SCALARS).collect_vec());
ArrowListArray::<i32>::new(
ArrowListArray::<i32>::default_datatype(scalars.data_type().clone()),
ArrowOffsets::try_from_lengths(
std::iter::repeat(NUM_SCALARS as usize / 10).take(10),
)
.unwrap()
.into(),
scalars.to_boxed(),
None,
)
});

let indices = ArrowPrimitiveArray::from_vec((0..untaken.0.len() as i32).collect_vec());
let taken = memory_use(|| re_chunk::util::take_array(&untaken.0, &indices));

(untaken, taken)
});

eprintln!(
"untaken={} taken={} total={}",
re_format::format_bytes(untaken_size_bytes as _),
re_format::format_bytes(taken_size_bytes as _),
re_format::format_bytes(total_size_bytes as _),
);

assert!(
taken_size_bytes < 1_000,
"taken array should be the size of a few empty datastructures at most"
);

{
let untaken = untaken
.values()
.as_any()
.downcast_ref::<ArrowPrimitiveArray<i64>>()
.unwrap();
let taken = taken
.values()
.as_any()
.downcast_ref::<ArrowPrimitiveArray<i64>>()
.unwrap();

assert!(
std::ptr::eq(
untaken.values().as_ptr_range().start,
taken.values().as_ptr_range().start
),
"whole thing should be a noop -- pointers should match"
);
}
}
Loading