Skip to content

Commit

Permalink
Parallelize knn query rewrite across slices rather than segments
Browse files Browse the repository at this point in the history
The concurrent query rewrite for knn vectory query introduced with apache#12160
requests one thread per segment to the executor. To align this with the
IndexSearcher parallel behaviour, we should rather parallelize across
slices. Also, we can reuse the same slice executor instance that the
index searcher already holds, in that way we are using a
QueueSizeBasedExecutor when a thread pool executor is provided.
  • Loading branch information
javanna committed May 24, 2023
1 parent c9c49bc commit 3c9d601
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import static org.apache.lucene.search.DocIdSetIterator.NO_MORE_DOCS;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
Expand Down Expand Up @@ -81,11 +82,12 @@ public Query rewrite(IndexSearcher indexSearcher) throws IOException {
filterWeight = null;
}

Executor executor = indexSearcher.getExecutor();
IndexSearcher.LeafSlice[] slices = indexSearcher.getSlices();
SliceExecutor sliceExecutor = indexSearcher.getSliceExecutor();
TopDocs[] perLeafResults =
(executor == null)
(sliceExecutor == null)
? sequentialSearch(reader.leaves(), filterWeight)
: parallelSearch(reader.leaves(), filterWeight, executor);
: parallelSearch(slices, filterWeight, sliceExecutor);

// Merge sort the results
TopDocs topK = TopDocs.merge(k, perLeafResults);
Expand All @@ -109,27 +111,38 @@ private TopDocs[] sequentialSearch(
}

private TopDocs[] parallelSearch(
List<LeafReaderContext> leafReaderContexts, Weight filterWeight, Executor executor) {
List<FutureTask<TopDocs>> tasks =
leafReaderContexts.stream()
.map(ctx -> new FutureTask<>(() -> searchLeaf(ctx, filterWeight)))
.toList();
IndexSearcher.LeafSlice[] slices, Weight filterWeight, SliceExecutor sliceExecutor) {

List<FutureTask<TopDocs[]>> tasks = new ArrayList<>(slices.length);
int segmentsCount = 0;
for (IndexSearcher.LeafSlice slice : slices) {
segmentsCount += slice.leaves.length;
tasks.add(new FutureTask<>(() -> {
TopDocs[] results = new TopDocs[slice.leaves.length];
int i = 0;
for (LeafReaderContext context : slice.leaves) {
results[i++] = searchLeaf(context, filterWeight);
}
return results;
}));
}

SliceExecutor sliceExecutor = new SliceExecutor(executor);
sliceExecutor.invokeAll(tasks);

return tasks.stream()
.map(
task -> {
try {
return task.get();
} catch (ExecutionException e) {
throw new RuntimeException(e.getCause());
} catch (InterruptedException e) {
throw new ThreadInterruptedException(e);
}
})
.toArray(TopDocs[]::new);
TopDocs[] topDocs = new TopDocs[segmentsCount];
int i = 0;
for (FutureTask<TopDocs[]> task : tasks) {
try {
for (TopDocs docs : task.get()) {
topDocs[i++] = docs;
}
} catch (ExecutionException e) {
throw new RuntimeException(e.getCause());
} catch (InterruptedException e) {
throw new ThreadInterruptedException(e);
}
}
return topDocs;
}

private TopDocs searchLeaf(LeafReaderContext ctx, Weight filterWeight) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -962,6 +962,10 @@ public Executor getExecutor() {
return executor;
}

SliceExecutor getSliceExecutor() {
return sliceExecutor;
}

/**
* Thrown when an attempt is made to add more than {@link #getMaxClauseCount()} clauses. This
* typically happens if a PrefixQuery, FuzzyQuery, WildcardQuery, or TermRangeQuery is expanded to
Expand Down

0 comments on commit 3c9d601

Please sign in to comment.