From 7f10dca1e578f38d775e867e47ca748527e002d9 Mon Sep 17 00:00:00 2001 From: Luca Cavanna Date: Mon, 26 Jun 2023 10:41:18 +0200 Subject: [PATCH] Revert "Parallelize knn query rewrite across slices rather than segments (#12325)" (#12385) This reverts commit 10bebde26936298c1909dd2ea0b5706b08d2face. Based on a recent discussion in https://github.com/apache/lucene/pull/12183#discussion_r1235739084 we agreed it makes more sense to parallelize knn query vector rewrite across leaves rather than leaf slices. --- .../lucene/search/AbstractKnnVectorQuery.java | 58 +++++++------------ .../apache/lucene/search/IndexSearcher.java | 4 -- 2 files changed, 22 insertions(+), 40 deletions(-) diff --git a/lucene/core/src/java/org/apache/lucene/search/AbstractKnnVectorQuery.java b/lucene/core/src/java/org/apache/lucene/search/AbstractKnnVectorQuery.java index eb51b623831a..1ece0ed5a787 100644 --- a/lucene/core/src/java/org/apache/lucene/search/AbstractKnnVectorQuery.java +++ b/lucene/core/src/java/org/apache/lucene/search/AbstractKnnVectorQuery.java @@ -19,12 +19,12 @@ import static org.apache.lucene.search.DocIdSetIterator.NO_MORE_DOCS; import java.io.IOException; -import java.util.ArrayList; import java.util.Arrays; import java.util.Comparator; import java.util.List; import java.util.Objects; import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; import java.util.concurrent.FutureTask; import org.apache.lucene.codecs.KnnVectorsReader; import org.apache.lucene.index.FieldInfo; @@ -81,12 +81,11 @@ public Query rewrite(IndexSearcher indexSearcher) throws IOException { filterWeight = null; } - SliceExecutor sliceExecutor = indexSearcher.getSliceExecutor(); - // in case of parallel execution, the leaf results are not ordered by leaf context's ordinal + Executor executor = indexSearcher.getExecutor(); TopDocs[] perLeafResults = - (sliceExecutor == null) + (executor == null) ? sequentialSearch(reader.leaves(), filterWeight) - : parallelSearch(indexSearcher.getSlices(), filterWeight, sliceExecutor); + : parallelSearch(reader.leaves(), filterWeight, executor); // Merge sort the results TopDocs topK = TopDocs.merge(k, perLeafResults); @@ -110,40 +109,27 @@ private TopDocs[] sequentialSearch( } private TopDocs[] parallelSearch( - IndexSearcher.LeafSlice[] slices, Weight filterWeight, SliceExecutor sliceExecutor) { - - List> tasks = new ArrayList<>(slices.length); - int segmentsCount = 0; - for (IndexSearcher.LeafSlice slice : slices) { - segmentsCount += slice.leaves.length; - tasks.add( - new FutureTask<>( - () -> { - TopDocs[] results = new TopDocs[slice.leaves.length]; - int i = 0; - for (LeafReaderContext context : slice.leaves) { - results[i++] = searchLeaf(context, filterWeight); - } - return results; - })); - } + List leafReaderContexts, Weight filterWeight, Executor executor) { + List> tasks = + leafReaderContexts.stream() + .map(ctx -> new FutureTask<>(() -> searchLeaf(ctx, filterWeight))) + .toList(); + SliceExecutor sliceExecutor = new SliceExecutor(executor); sliceExecutor.invokeAll(tasks); - TopDocs[] topDocs = new TopDocs[segmentsCount]; - int i = 0; - for (FutureTask task : tasks) { - try { - for (TopDocs docs : task.get()) { - topDocs[i++] = docs; - } - } catch (ExecutionException e) { - throw new RuntimeException(e.getCause()); - } catch (InterruptedException e) { - throw new ThreadInterruptedException(e); - } - } - return topDocs; + return tasks.stream() + .map( + task -> { + try { + return task.get(); + } catch (ExecutionException e) { + throw new RuntimeException(e.getCause()); + } catch (InterruptedException e) { + throw new ThreadInterruptedException(e); + } + }) + .toArray(TopDocs[]::new); } private TopDocs searchLeaf(LeafReaderContext ctx, Weight filterWeight) throws IOException { diff --git a/lucene/core/src/java/org/apache/lucene/search/IndexSearcher.java b/lucene/core/src/java/org/apache/lucene/search/IndexSearcher.java index fa7bb0e63c68..227d16dc7215 100644 --- a/lucene/core/src/java/org/apache/lucene/search/IndexSearcher.java +++ b/lucene/core/src/java/org/apache/lucene/search/IndexSearcher.java @@ -962,10 +962,6 @@ public Executor getExecutor() { return executor; } - SliceExecutor getSliceExecutor() { - return sliceExecutor; - } - /** * Thrown when an attempt is made to add more than {@link #getMaxClauseCount()} clauses. This * typically happens if a PrefixQuery, FuzzyQuery, WildcardQuery, or TermRangeQuery is expanded to