From 10bebde26936298c1909dd2ea0b5706b08d2face Mon Sep 17 00:00:00 2001 From: Luca Cavanna Date: Fri, 26 May 2023 09:17:25 +0200 Subject: [PATCH] Parallelize knn query rewrite across slices rather than segments (#12325) The concurrent query rewrite for knn vectory query introduced with #12160 requests one thread per segment to the executor. To align this with the IndexSearcher parallel behaviour, we should rather parallelize across slices. Also, we can reuse the same slice executor instance that the index searcher already holds, in that way we are using a QueueSizeBasedExecutor when a thread pool executor is provided. --- .../lucene/search/AbstractKnnVectorQuery.java | 58 ++++++++++++------- .../apache/lucene/search/IndexSearcher.java | 4 ++ 2 files changed, 40 insertions(+), 22 deletions(-) diff --git a/lucene/core/src/java/org/apache/lucene/search/AbstractKnnVectorQuery.java b/lucene/core/src/java/org/apache/lucene/search/AbstractKnnVectorQuery.java index 04309561cbdd..cd8d73b8c26f 100644 --- a/lucene/core/src/java/org/apache/lucene/search/AbstractKnnVectorQuery.java +++ b/lucene/core/src/java/org/apache/lucene/search/AbstractKnnVectorQuery.java @@ -19,12 +19,12 @@ import static org.apache.lucene.search.DocIdSetIterator.NO_MORE_DOCS; import java.io.IOException; +import java.util.ArrayList; import java.util.Arrays; import java.util.Comparator; import java.util.List; import java.util.Objects; import java.util.concurrent.ExecutionException; -import java.util.concurrent.Executor; import java.util.concurrent.FutureTask; import org.apache.lucene.codecs.KnnVectorsReader; import org.apache.lucene.index.FieldInfo; @@ -81,11 +81,12 @@ public Query rewrite(IndexSearcher indexSearcher) throws IOException { filterWeight = null; } - Executor executor = indexSearcher.getExecutor(); + SliceExecutor sliceExecutor = indexSearcher.getSliceExecutor(); + // in case of parallel execution, the leaf results are not ordered by leaf context's ordinal TopDocs[] perLeafResults = - (executor == null) + (sliceExecutor == null) ? sequentialSearch(reader.leaves(), filterWeight) - : parallelSearch(reader.leaves(), filterWeight, executor); + : parallelSearch(indexSearcher.getSlices(), filterWeight, sliceExecutor); // Merge sort the results TopDocs topK = TopDocs.merge(k, perLeafResults); @@ -109,27 +110,40 @@ private TopDocs[] sequentialSearch( } private TopDocs[] parallelSearch( - List leafReaderContexts, Weight filterWeight, Executor executor) { - List> tasks = - leafReaderContexts.stream() - .map(ctx -> new FutureTask<>(() -> searchLeaf(ctx, filterWeight))) - .toList(); + IndexSearcher.LeafSlice[] slices, Weight filterWeight, SliceExecutor sliceExecutor) { + + List> tasks = new ArrayList<>(slices.length); + int segmentsCount = 0; + for (IndexSearcher.LeafSlice slice : slices) { + segmentsCount += slice.leaves.length; + tasks.add( + new FutureTask<>( + () -> { + TopDocs[] results = new TopDocs[slice.leaves.length]; + int i = 0; + for (LeafReaderContext context : slice.leaves) { + results[i++] = searchLeaf(context, filterWeight); + } + return results; + })); + } - SliceExecutor sliceExecutor = new SliceExecutor(executor); sliceExecutor.invokeAll(tasks); - return tasks.stream() - .map( - task -> { - try { - return task.get(); - } catch (ExecutionException e) { - throw new RuntimeException(e.getCause()); - } catch (InterruptedException e) { - throw new ThreadInterruptedException(e); - } - }) - .toArray(TopDocs[]::new); + TopDocs[] topDocs = new TopDocs[segmentsCount]; + int i = 0; + for (FutureTask task : tasks) { + try { + for (TopDocs docs : task.get()) { + topDocs[i++] = docs; + } + } catch (ExecutionException e) { + throw new RuntimeException(e.getCause()); + } catch (InterruptedException e) { + throw new ThreadInterruptedException(e); + } + } + return topDocs; } private TopDocs searchLeaf(LeafReaderContext ctx, Weight filterWeight) throws IOException { diff --git a/lucene/core/src/java/org/apache/lucene/search/IndexSearcher.java b/lucene/core/src/java/org/apache/lucene/search/IndexSearcher.java index 227d16dc7215..fa7bb0e63c68 100644 --- a/lucene/core/src/java/org/apache/lucene/search/IndexSearcher.java +++ b/lucene/core/src/java/org/apache/lucene/search/IndexSearcher.java @@ -962,6 +962,10 @@ public Executor getExecutor() { return executor; } + SliceExecutor getSliceExecutor() { + return sliceExecutor; + } + /** * Thrown when an attempt is made to add more than {@link #getMaxClauseCount()} clauses. This * typically happens if a PrefixQuery, FuzzyQuery, WildcardQuery, or TermRangeQuery is expanded to