Skip to content

Commit

Permalink
Revert "Parallelize knn query rewrite across slices rather than segme…
Browse files Browse the repository at this point in the history
…nts (#12325)" (#12385)

This reverts commit 10bebde.

Based on a recent discussion in
#12183 (comment) we
agreed it makes more sense to parallelize knn query vector rewrite
across leaves rather than leaf slices.
  • Loading branch information
javanna authored Jun 26, 2023
1 parent cb195bd commit 7f10dca
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,12 @@
import static org.apache.lucene.search.DocIdSetIterator.NO_MORE_DOCS;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.FutureTask;
import org.apache.lucene.codecs.KnnVectorsReader;
import org.apache.lucene.index.FieldInfo;
Expand Down Expand Up @@ -81,12 +81,11 @@ public Query rewrite(IndexSearcher indexSearcher) throws IOException {
filterWeight = null;
}

SliceExecutor sliceExecutor = indexSearcher.getSliceExecutor();
// in case of parallel execution, the leaf results are not ordered by leaf context's ordinal
Executor executor = indexSearcher.getExecutor();
TopDocs[] perLeafResults =
(sliceExecutor == null)
(executor == null)
? sequentialSearch(reader.leaves(), filterWeight)
: parallelSearch(indexSearcher.getSlices(), filterWeight, sliceExecutor);
: parallelSearch(reader.leaves(), filterWeight, executor);

// Merge sort the results
TopDocs topK = TopDocs.merge(k, perLeafResults);
Expand All @@ -110,40 +109,27 @@ private TopDocs[] sequentialSearch(
}

private TopDocs[] parallelSearch(
IndexSearcher.LeafSlice[] slices, Weight filterWeight, SliceExecutor sliceExecutor) {

List<FutureTask<TopDocs[]>> tasks = new ArrayList<>(slices.length);
int segmentsCount = 0;
for (IndexSearcher.LeafSlice slice : slices) {
segmentsCount += slice.leaves.length;
tasks.add(
new FutureTask<>(
() -> {
TopDocs[] results = new TopDocs[slice.leaves.length];
int i = 0;
for (LeafReaderContext context : slice.leaves) {
results[i++] = searchLeaf(context, filterWeight);
}
return results;
}));
}
List<LeafReaderContext> leafReaderContexts, Weight filterWeight, Executor executor) {
List<FutureTask<TopDocs>> tasks =
leafReaderContexts.stream()
.map(ctx -> new FutureTask<>(() -> searchLeaf(ctx, filterWeight)))
.toList();

SliceExecutor sliceExecutor = new SliceExecutor(executor);
sliceExecutor.invokeAll(tasks);

TopDocs[] topDocs = new TopDocs[segmentsCount];
int i = 0;
for (FutureTask<TopDocs[]> task : tasks) {
try {
for (TopDocs docs : task.get()) {
topDocs[i++] = docs;
}
} catch (ExecutionException e) {
throw new RuntimeException(e.getCause());
} catch (InterruptedException e) {
throw new ThreadInterruptedException(e);
}
}
return topDocs;
return tasks.stream()
.map(
task -> {
try {
return task.get();
} catch (ExecutionException e) {
throw new RuntimeException(e.getCause());
} catch (InterruptedException e) {
throw new ThreadInterruptedException(e);
}
})
.toArray(TopDocs[]::new);
}

private TopDocs searchLeaf(LeafReaderContext ctx, Weight filterWeight) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -962,10 +962,6 @@ public Executor getExecutor() {
return executor;
}

SliceExecutor getSliceExecutor() {
return sliceExecutor;
}

/**
* Thrown when an attempt is made to add more than {@link #getMaxClauseCount()} clauses. This
* typically happens if a PrefixQuery, FuzzyQuery, WildcardQuery, or TermRangeQuery is expanded to
Expand Down

0 comments on commit 7f10dca

Please sign in to comment.