Skip to content

Commit 711bad7

Browse files
authored
perf: coalesce ids before executing take (#2680)
Late materialization is a great benefit when executing a highly selective filter. However, if a filter is highly selective it means that each input batch will probably only have a few matching rows. The current implementation executes take for each filtered batch. E.g. instead of a single call of `take(500, 10000, 300000)` we get three calls `take(500)`, `take(10000)`, and `take(300000)`. This means: * We can't coalesce * More CPU overhead (many calls to take_ranges) * Very small output batches (user's batch size is not respected) On cloud storage I see a 10x plus benefit in scan performance. We have a benchmark for this (EDA search plot 4) which should assist with preventing regression in the future: https://bencher.dev/console/projects/weston-lancedb/plots
1 parent 8cfb6a5 commit 711bad7

File tree

4 files changed

+204
-125
lines changed

4 files changed

+204
-125
lines changed

python/python/benchmarks/test_search.py

+28-2
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ def create_table(num_rows, offset) -> pa.Table:
5858
"vector": vectors,
5959
"filterable": filterable,
6060
"category": categories,
61+
"category_no_index": categories,
6162
"genres": genres,
6263
}
6364
)
@@ -77,9 +78,11 @@ def create_base_dataset(data_dir: Path) -> lance.LanceDataset:
7778
rows_remaining -= next_batch_length
7879
table = create_table(next_batch_length, offset)
7980
if offset == 0:
80-
dataset = lance.write_dataset(table, tmp_path)
81+
dataset = lance.write_dataset(table, tmp_path, use_legacy_format=False)
8182
else:
82-
dataset = lance.write_dataset(table, tmp_path, mode="append")
83+
dataset = lance.write_dataset(
84+
table, tmp_path, mode="append", use_legacy_format=False
85+
)
8386
offset += next_batch_length
8487

8588
dataset.create_index(
@@ -479,3 +482,26 @@ def test_label_list_index_prefilter(test_dataset, benchmark, filter: str):
479482
prefilter=True,
480483
filter=filter,
481484
)
485+
486+
487+
@pytest.mark.benchmark(group="late_materialization")
488+
@pytest.mark.parametrize(
489+
"use_index",
490+
(False, True),
491+
ids=["no_index", "with_index"],
492+
)
493+
def test_late_materialization(test_dataset, benchmark, use_index):
494+
column = "category" if use_index else "category_no_index"
495+
print(
496+
test_dataset.scanner(
497+
columns=["vector"],
498+
filter=f"{column} = 0",
499+
batch_size=32,
500+
).explain_plan(True)
501+
)
502+
benchmark(
503+
test_dataset.to_table,
504+
columns=["vector"],
505+
filter=f"{column} = 0",
506+
batch_size=32,
507+
)

python/python/tests/test_dataset.py

+12
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
import lance
1818
import lance.fragment
19+
import numpy as np
1920
import pandas as pd
2021
import pandas.testing as tm
2122
import polars as pl
@@ -1994,3 +1995,14 @@ def test_legacy_dataset(tmp_path: Path):
19941995

19951996
fragment = list(dataset.get_fragments())[0]
19961997
assert "minor_version: 3" not in format_fragment(fragment.metadata, dataset)
1998+
1999+
2000+
def test_late_materialization_batch_size(tmp_path: Path):
2001+
table = pa.table({"filter": np.arange(32 * 32), "values": np.arange(32 * 32)})
2002+
dataset = lance.write_dataset(
2003+
table, tmp_path, data_storage_version="stable", max_rows_per_file=10000
2004+
)
2005+
for batch in dataset.to_batches(
2006+
columns=["values"], filter="filter % 2 == 0", batch_size=32
2007+
):
2008+
assert batch.num_rows == 32

rust/lance-encoding/src/decoder.rs

+7-1
Original file line numberDiff line numberDiff line change
@@ -920,7 +920,13 @@ impl DecodeBatchScheduler {
920920
mut schedule_action: impl FnMut(Result<DecoderMessage>),
921921
) {
922922
let rows_requested = ranges.iter().map(|r| r.end - r.start).sum::<u64>();
923-
trace!("Scheduling ranges {:?} ({} rows)", ranges, rows_requested);
923+
trace!(
924+
"Scheduling {} ranges across {}..{} ({} rows)",
925+
ranges.len(),
926+
ranges.first().unwrap().start,
927+
ranges.last().unwrap().end,
928+
rows_requested
929+
);
924930

925931
let mut context = SchedulerContext::new(io);
926932
let maybe_root_job = self.root_scheduler.schedule_ranges(ranges, filter);

0 commit comments

Comments
 (0)