Skip to content

Commit a144028

Browse files
authored
perf: parallelize ngram indexing (#3501)
total indexing time reduced from 23s to 5s ``` ngram_index(1000000) time: [5.1192 s 5.1756 s 5.2319 s] change: [-78.163% -77.791% -77.410%] (p = 0.00 < 0.05) Performance has improved. ``` --------- Signed-off-by: BubbleCal <bubble-cal@outlook.com>
1 parent 6194619 commit a144028

File tree

4 files changed

+59
-12
lines changed

4 files changed

+59
-12
lines changed

rust/lance-index/benches/ngram.rs

+15-9
Original file line numberDiff line numberDiff line change
@@ -58,18 +58,24 @@ fn bench_ngram(c: &mut Criterion) {
5858
vec![doc_col, row_id_col],
5959
)
6060
.unwrap();
61-
let stream =
62-
RecordBatchStreamAdapter::new(batch.schema(), stream::iter(vec![Ok(batch.clone())]));
63-
let stream = Box::pin(stream);
6461

65-
rt.block_on(async {
66-
let mut builder = NGramIndexBuilder::default();
67-
builder.train(stream).await.unwrap();
68-
builder.write(store.as_ref()).await.unwrap();
62+
let batches = (0..1000).map(|i| batch.slice(i * 1000, 1000)).collect_vec();
63+
64+
c.bench_function(format!("ngram_index({TOTAL})").as_str(), |b| {
65+
b.to_async(&rt).iter(|| async {
66+
let stream = RecordBatchStreamAdapter::new(
67+
batch.schema(),
68+
stream::iter(batches.clone().into_iter().map(Ok)),
69+
);
70+
let stream = Box::pin(stream);
71+
let mut builder = NGramIndexBuilder::default();
72+
builder.train(stream).await.unwrap();
73+
builder.write(store.as_ref()).await.unwrap();
74+
})
6975
});
70-
let index = rt.block_on(NGramIndex::load(store)).unwrap();
7176

72-
c.bench_function(format!("invert({TOTAL})").as_str(), |b| {
77+
let index = rt.block_on(NGramIndex::load(store)).unwrap();
78+
c.bench_function(format!("ngram_search({TOTAL})").as_str(), |b| {
7379
b.to_async(&rt).iter(|| async {
7480
let sample_idx = rand::random::<usize>() % batch.num_rows();
7581
let sample = batch

rust/lance-index/src/scalar/inverted.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
// SPDX-License-Identifier: Apache-2.0
22
// SPDX-FileCopyrightText: Copyright The Lance Authors
33

4-
mod builder;
4+
pub mod builder;
55
mod index;
66
mod tokenizer;
77
mod wand;

rust/lance-index/src/scalar/inverted/builder.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ lazy_static! {
5353
// it doesn't mean higher value will result in better performance,
5454
// because the bottleneck can be the IO once the number of shards is large enough,
5555
// it's 8 by default
56-
static ref LANCE_FTS_NUM_SHARDS: usize = std::env::var("LANCE_FTS_NUM_SHARDS")
56+
pub static ref LANCE_FTS_NUM_SHARDS: usize = std::env::var("LANCE_FTS_NUM_SHARDS")
5757
.unwrap_or_else(|_| "8".to_string())
5858
.parse()
5959
.expect("failed to parse LANCE_FTS_NUM_SHARDS");

rust/lance-index/src/scalar/ngram.rs

+42-1
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ use crate::vector::VectorIndex;
2727
use crate::{Index, IndexType};
2828

2929
use super::btree::TrainingSource;
30+
use super::inverted::builder::LANCE_FTS_NUM_SHARDS;
3031
use super::inverted::TokenSet;
3132
use super::{AnyQuery, IndexReader, IndexStore, ScalarIndex, SearchResult, TextQuery};
3233

@@ -465,12 +466,52 @@ impl NGramIndexBuilder {
465466
let schema = data.schema();
466467
Self::validate_schema(schema.as_ref())?;
467468

469+
let num_shards = *LANCE_FTS_NUM_SHARDS;
470+
let mut senders = Vec::with_capacity(num_shards);
471+
let mut builders = Vec::with_capacity(num_shards);
472+
for _ in 0..*LANCE_FTS_NUM_SHARDS {
473+
let (send, mut recv) = tokio::sync::mpsc::channel(2);
474+
senders.push(send);
475+
476+
let mut builder = Self::new();
477+
let future = tokio::spawn(async move {
478+
while let Some(batch) = recv.recv().await {
479+
builder.process_batch(&batch);
480+
}
481+
builder
482+
});
483+
builders.push(future);
484+
}
485+
486+
let mut idx = 0;
468487
while let Some(batch) = data.try_next().await? {
469-
self.process_batch(&batch);
488+
senders[idx % num_shards].send(batch).await.unwrap();
489+
idx += 1;
490+
}
491+
492+
std::mem::drop(senders);
493+
let builders = futures::future::try_join_all(builders).await?;
494+
for builder in builders {
495+
self.merge(builder);
470496
}
497+
471498
Ok(())
472499
}
473500

501+
fn merge(&mut self, mut other: Self) {
502+
for (token, new_token_id) in other.tokens_map {
503+
if let Some(token_id) = self.tokens_map.get(&token) {
504+
self.bitmaps[*token_id as usize] |=
505+
std::mem::take(&mut other.bitmaps[new_token_id as usize]);
506+
} else {
507+
// This is a new token
508+
self.tokens_map.insert(token, self.bitmaps.len() as u32);
509+
self.bitmaps
510+
.push(std::mem::take(&mut other.bitmaps[new_token_id as usize]));
511+
}
512+
}
513+
}
514+
474515
pub async fn write(self, store: &dyn IndexStore) -> Result<()> {
475516
let mut ordered_tokens = self.tokens_map.into_iter().collect::<Vec<_>>();
476517
ordered_tokens.sort_by_key(|(_, id)| *id);

0 commit comments

Comments
 (0)