Skip to content

Commit 2bcba5e

Browse files
authored
perf: speed up materialization when there is no block list (#1897)
Materialization converts a row id mask into a vector of row ids. The old approach used a FragIdIter which was too slow. In fact, we probably want to eventually get rid of FragIdIter entirely (see #1896). In the meantime, when there is no block list, we can materialize by filtering the allow list with the fragment list instead. This should greatly speed up indexed searches that don't have any vector search component.
1 parent 57dee40 commit 2bcba5e

File tree

3 files changed

+159
-21
lines changed

3 files changed

+159
-21
lines changed

python/python/benchmarks/test_search.py

+69-7
Original file line numberDiff line numberDiff line change
@@ -54,8 +54,18 @@ def create_base_dataset(data_dir: Path) -> lance.LanceDataset:
5454
if dataset:
5555
return dataset
5656

57-
table = create_table(NUM_ROWS, offset=0)
58-
dataset = lance.write_dataset(table, tmp_path)
57+
rows_remaining = NUM_ROWS
58+
offset = 0
59+
dataset = None
60+
while rows_remaining > 0:
61+
next_batch_length = min(rows_remaining, 1024 * 1024)
62+
rows_remaining -= next_batch_length
63+
table = create_table(next_batch_length, offset)
64+
if offset == 0:
65+
dataset = lance.write_dataset(table, tmp_path)
66+
else:
67+
dataset = lance.write_dataset(table, tmp_path, mode="append")
68+
offset += next_batch_length
5969

6070
dataset.create_index(
6171
column="vector",
@@ -66,6 +76,8 @@ def create_base_dataset(data_dir: Path) -> lance.LanceDataset:
6676
num_bits=8,
6777
)
6878

79+
dataset.create_scalar_index("filterable", "BTREE")
80+
6981
return dataset
7082

7183

@@ -208,14 +220,23 @@ def test_filtered_search(test_dataset, benchmark, selectivity, prefilter, use_in
208220
"filterable < 5000",
209221
"filterable > 5000",
210222
),
223+
ids=[
224+
"none",
225+
"equality",
226+
"not_equality",
227+
"in_list_one",
228+
"not_in_list_one",
229+
"not_equality_and_chain",
230+
"not_in_list_three",
231+
"less_than_selective",
232+
"greater_than_not_selective",
233+
],
211234
)
212-
def test_scalar_index_prefilter(datasets: Datasets, benchmark, filter: str):
213-
dataset = datasets.clean
214-
dataset.create_scalar_index("filterable", "BTREE")
235+
def test_scalar_index_prefilter(test_dataset, benchmark, filter: str):
215236
q = pc.random(N_DIMS).cast(pa.float32())
216237
if filter is None:
217238
benchmark(
218-
dataset.to_table,
239+
test_dataset.to_table,
219240
nearest=dict(
220241
column="vector",
221242
q=q,
@@ -225,7 +246,7 @@ def test_scalar_index_prefilter(datasets: Datasets, benchmark, filter: str):
225246
)
226247
else:
227248
benchmark(
228-
dataset.to_table,
249+
test_dataset.to_table,
229250
nearest=dict(
230251
column="vector",
231252
q=q,
@@ -235,3 +256,44 @@ def test_scalar_index_prefilter(datasets: Datasets, benchmark, filter: str):
235256
prefilter=True,
236257
filter=filter,
237258
)
259+
260+
261+
@pytest.mark.benchmark(group="query_no_vec")
262+
@pytest.mark.parametrize(
263+
"filter",
264+
(
265+
None,
266+
"filterable = 0",
267+
"filterable != 0",
268+
"filterable IN (0)",
269+
"filterable IN (0, 5000, 10000)",
270+
"filterable NOT IN (0)",
271+
"filterable != 0 AND filterable != 5000 AND filterable != 10000",
272+
"filterable NOT IN (0, 5000, 10000)",
273+
"filterable < 5000",
274+
"filterable > 5000",
275+
),
276+
ids=[
277+
"none",
278+
"equality",
279+
"not_equality",
280+
"in_list_one",
281+
"in_list_three",
282+
"not_in_list_one",
283+
"not_equality_and_chain",
284+
"not_in_list_three",
285+
"less_than_selective",
286+
"greater_than_not_selective",
287+
],
288+
)
289+
def test_scalar_index_search(test_dataset, benchmark, filter: str):
290+
if filter is None:
291+
benchmark(
292+
test_dataset.to_table,
293+
)
294+
else:
295+
benchmark(
296+
test_dataset.to_table,
297+
prefilter=True,
298+
filter=filter,
299+
)

rust/lance-core/src/utils/mask.rs

+46
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
use std::collections::HashSet;
1516
use std::io::Write;
1617
use std::{collections::BTreeMap, io::Read};
1718

@@ -22,6 +23,8 @@ use roaring::RoaringBitmap;
2223

2324
use crate::Result;
2425

26+
use super::address::RowAddress;
27+
2528
/// A row id mask to select or deselect particular row ids
2629
///
2730
/// If both the allow_list and the block_list are Some then the only selected
@@ -299,6 +302,43 @@ impl RowIdTreeMap {
299302
self.inner.is_empty()
300303
}
301304

305+
/// The number of rows in the map
306+
///
307+
/// If there are any "full fragment" items then this is unknown and None is returned
308+
pub fn len(&self) -> Option<u64> {
309+
self.inner
310+
.values()
311+
.map(|row_id_selection| match row_id_selection {
312+
RowIdSelection::Full => None,
313+
RowIdSelection::Partial(indices) => Some(indices.len()),
314+
})
315+
.try_fold(0_u64, |acc, next| next.map(|next| next + acc))
316+
}
317+
318+
/// An iterator of row ids
319+
///
320+
/// If there are any "full fragment" items then this can't be calculated and None
321+
/// is returned
322+
pub fn row_ids(&self) -> Option<impl Iterator<Item = RowAddress> + '_> {
323+
let inner_iters = self
324+
.inner
325+
.iter()
326+
.filter_map(|(frag_id, row_id_selection)| match row_id_selection {
327+
RowIdSelection::Full => None,
328+
RowIdSelection::Partial(bitmap) => Some(
329+
bitmap
330+
.iter()
331+
.map(|row_offset| RowAddress::new_from_parts(*frag_id, row_offset)),
332+
),
333+
})
334+
.collect::<Vec<_>>();
335+
if inner_iters.len() != self.inner.len() {
336+
None
337+
} else {
338+
Some(inner_iters.into_iter().flatten())
339+
}
340+
}
341+
302342
/// Add a bitmap for a single fragment
303343
pub fn insert_bitmap(&mut self, fragment: u32, bitmap: RoaringBitmap) {
304344
self.inner.insert(fragment, RowIdSelection::Partial(bitmap));
@@ -320,6 +360,12 @@ impl RowIdTreeMap {
320360
}
321361
}
322362

363+
pub fn remove_fragments(&mut self, frag_ids: impl IntoIterator<Item = u32>) {
364+
let frag_id_set = frag_ids.into_iter().collect::<HashSet<_>>();
365+
self.inner
366+
.retain(|frag_id, _| frag_id_set.contains(frag_id));
367+
}
368+
323369
/// Compute the serialized size of the set.
324370
pub fn serialized_size(&self) -> usize {
325371
// Starts at 4 because of the u32 num_entries

rust/lance/src/io/exec/scalar_index.rs

+44-14
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ use lance_table::format::Fragment;
3333
use pin_project::pin_project;
3434
use roaring::RoaringBitmap;
3535
use snafu::{location, Location};
36-
use tracing::instrument;
36+
use tracing::{debug_span, instrument};
3737

3838
use crate::{
3939
index::{prefilter::PreFilter, DatasetIndexInternalExt},
@@ -257,29 +257,59 @@ impl MaterializeIndexExec {
257257
) -> Result<RecordBatch> {
258258
// TODO: multiple batches, stream without materializing all row ids in memory
259259
let mask = expr.evaluate(dataset.as_ref());
260-
let fragment_bitmap = RoaringBitmap::from_iter(fragments.iter().map(|frag| frag.id as u32));
261-
// The user-requested `fragments` is guaranteed to be stricter than the index's fragment
262-
// bitmap. This node only runs on indexed fragments and any fragments that were deleted
263-
// when the index was trained will still be deleted when the index is queried.
264-
let prefilter = PreFilter::create_deletion_mask(dataset.clone(), fragment_bitmap);
260+
let span = debug_span!("create_prefilter");
261+
let prefilter = span.in_scope(|| {
262+
let fragment_bitmap =
263+
RoaringBitmap::from_iter(fragments.iter().map(|frag| frag.id as u32));
264+
// The user-requested `fragments` is guaranteed to be stricter than the index's fragment
265+
// bitmap. This node only runs on indexed fragments and any fragments that were deleted
266+
// when the index was trained will still be deleted when the index is queried.
267+
PreFilter::create_deletion_mask(dataset.clone(), fragment_bitmap)
268+
});
265269
let mask = if let Some(prefilter) = prefilter {
266270
let (mask, prefilter) = futures::try_join!(mask, prefilter)?;
267271
mask.also_block((*prefilter).clone())
268272
} else {
269273
mask.await?
270274
};
271-
let ids = match (mask.allow_list, mask.block_list) {
275+
let span = debug_span!("make_ids");
276+
let ids = span.in_scope(|| match (mask.allow_list, mask.block_list) {
272277
(None, None) => FragIdIter::new(fragments).collect::<Vec<_>>(),
273-
(Some(allow_list), None) => FragIdIter::new(fragments)
274-
.filter(|row_id| allow_list.contains(*row_id))
275-
.collect(),
278+
(Some(mut allow_list), None) => {
279+
allow_list.remove_fragments(fragments.iter().map(|frag| frag.id as u32));
280+
if let Some(allow_list_iter) = allow_list.row_ids() {
281+
allow_list_iter.map(u64::from).collect::<Vec<_>>()
282+
} else {
283+
FragIdIter::new(fragments)
284+
.filter(|row_id| allow_list.contains(*row_id))
285+
.collect()
286+
}
287+
}
276288
(None, Some(block_list)) => FragIdIter::new(fragments)
277289
.filter(|row_id| !block_list.contains(*row_id))
278290
.collect(),
279-
(Some(allow_list), Some(block_list)) => FragIdIter::new(fragments)
280-
.filter(|row_id| !block_list.contains(*row_id) && allow_list.contains(*row_id))
281-
.collect(),
282-
};
291+
(Some(mut allow_list), Some(block_list)) => {
292+
allow_list.remove_fragments(fragments.iter().map(|frag| frag.id as u32));
293+
if let Some(allow_list_iter) = allow_list.row_ids() {
294+
allow_list_iter
295+
.filter_map(|addr| {
296+
let row_id = u64::from(addr);
297+
if !block_list.contains(row_id) {
298+
Some(row_id)
299+
} else {
300+
None
301+
}
302+
})
303+
.collect::<Vec<_>>()
304+
} else {
305+
FragIdIter::new(fragments)
306+
.filter(|row_id| {
307+
!block_list.contains(*row_id) && allow_list.contains(*row_id)
308+
})
309+
.collect()
310+
}
311+
}
312+
});
283313
let ids = UInt64Array::from(ids);
284314
Ok(RecordBatch::try_new(
285315
MATERIALIZE_INDEX_SCHEMA.clone(),

0 commit comments

Comments
 (0)