From 3c9d6011d29f471fa15e88a48bbeef141f1bb85b Mon Sep 17 00:00:00 2001 From: Luca Cavanna Date: Wed, 24 May 2023 12:42:06 +0200 Subject: [PATCH 1/4] Parallelize knn query rewrite across slices rather than segments The concurrent query rewrite for knn vectory query introduced with #12160 requests one thread per segment to the executor. To align this with the IndexSearcher parallel behaviour, we should rather parallelize across slices. Also, we can reuse the same slice executor instance that the index searcher already holds, in that way we are using a QueueSizeBasedExecutor when a thread pool executor is provided. --- .../lucene/search/AbstractKnnVectorQuery.java | 55 ++++++++++++------- .../apache/lucene/search/IndexSearcher.java | 4 ++ 2 files changed, 38 insertions(+), 21 deletions(-) diff --git a/lucene/core/src/java/org/apache/lucene/search/AbstractKnnVectorQuery.java b/lucene/core/src/java/org/apache/lucene/search/AbstractKnnVectorQuery.java index 04309561cbdd..cfab89528254 100644 --- a/lucene/core/src/java/org/apache/lucene/search/AbstractKnnVectorQuery.java +++ b/lucene/core/src/java/org/apache/lucene/search/AbstractKnnVectorQuery.java @@ -19,6 +19,7 @@ import static org.apache.lucene.search.DocIdSetIterator.NO_MORE_DOCS; import java.io.IOException; +import java.util.ArrayList; import java.util.Arrays; import java.util.Comparator; import java.util.List; @@ -81,11 +82,12 @@ public Query rewrite(IndexSearcher indexSearcher) throws IOException { filterWeight = null; } - Executor executor = indexSearcher.getExecutor(); + IndexSearcher.LeafSlice[] slices = indexSearcher.getSlices(); + SliceExecutor sliceExecutor = indexSearcher.getSliceExecutor(); TopDocs[] perLeafResults = - (executor == null) + (sliceExecutor == null) ? sequentialSearch(reader.leaves(), filterWeight) - : parallelSearch(reader.leaves(), filterWeight, executor); + : parallelSearch(slices, filterWeight, sliceExecutor); // Merge sort the results TopDocs topK = TopDocs.merge(k, perLeafResults); @@ -109,27 +111,38 @@ private TopDocs[] sequentialSearch( } private TopDocs[] parallelSearch( - List leafReaderContexts, Weight filterWeight, Executor executor) { - List> tasks = - leafReaderContexts.stream() - .map(ctx -> new FutureTask<>(() -> searchLeaf(ctx, filterWeight))) - .toList(); + IndexSearcher.LeafSlice[] slices, Weight filterWeight, SliceExecutor sliceExecutor) { + + List> tasks = new ArrayList<>(slices.length); + int segmentsCount = 0; + for (IndexSearcher.LeafSlice slice : slices) { + segmentsCount += slice.leaves.length; + tasks.add(new FutureTask<>(() -> { + TopDocs[] results = new TopDocs[slice.leaves.length]; + int i = 0; + for (LeafReaderContext context : slice.leaves) { + results[i++] = searchLeaf(context, filterWeight); + } + return results; + })); + } - SliceExecutor sliceExecutor = new SliceExecutor(executor); sliceExecutor.invokeAll(tasks); - return tasks.stream() - .map( - task -> { - try { - return task.get(); - } catch (ExecutionException e) { - throw new RuntimeException(e.getCause()); - } catch (InterruptedException e) { - throw new ThreadInterruptedException(e); - } - }) - .toArray(TopDocs[]::new); + TopDocs[] topDocs = new TopDocs[segmentsCount]; + int i = 0; + for (FutureTask task : tasks) { + try { + for (TopDocs docs : task.get()) { + topDocs[i++] = docs; + } + } catch (ExecutionException e) { + throw new RuntimeException(e.getCause()); + } catch (InterruptedException e) { + throw new ThreadInterruptedException(e); + } + } + return topDocs; } private TopDocs searchLeaf(LeafReaderContext ctx, Weight filterWeight) throws IOException { diff --git a/lucene/core/src/java/org/apache/lucene/search/IndexSearcher.java b/lucene/core/src/java/org/apache/lucene/search/IndexSearcher.java index 227d16dc7215..fa7bb0e63c68 100644 --- a/lucene/core/src/java/org/apache/lucene/search/IndexSearcher.java +++ b/lucene/core/src/java/org/apache/lucene/search/IndexSearcher.java @@ -962,6 +962,10 @@ public Executor getExecutor() { return executor; } + SliceExecutor getSliceExecutor() { + return sliceExecutor; + } + /** * Thrown when an attempt is made to add more than {@link #getMaxClauseCount()} clauses. This * typically happens if a PrefixQuery, FuzzyQuery, WildcardQuery, or TermRangeQuery is expanded to From 5f46e1ea1498c5acb4bc943ca443dd24a21431c7 Mon Sep 17 00:00:00 2001 From: Luca Cavanna Date: Wed, 24 May 2023 12:48:50 +0200 Subject: [PATCH 2/4] iter --- .../lucene/search/AbstractKnnVectorQuery.java | 24 +++++++++---------- 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/lucene/core/src/java/org/apache/lucene/search/AbstractKnnVectorQuery.java b/lucene/core/src/java/org/apache/lucene/search/AbstractKnnVectorQuery.java index cfab89528254..98e8d227e624 100644 --- a/lucene/core/src/java/org/apache/lucene/search/AbstractKnnVectorQuery.java +++ b/lucene/core/src/java/org/apache/lucene/search/AbstractKnnVectorQuery.java @@ -25,7 +25,6 @@ import java.util.List; import java.util.Objects; import java.util.concurrent.ExecutionException; -import java.util.concurrent.Executor; import java.util.concurrent.FutureTask; import org.apache.lucene.codecs.KnnVectorsReader; import org.apache.lucene.index.FieldInfo; @@ -82,12 +81,11 @@ public Query rewrite(IndexSearcher indexSearcher) throws IOException { filterWeight = null; } - IndexSearcher.LeafSlice[] slices = indexSearcher.getSlices(); SliceExecutor sliceExecutor = indexSearcher.getSliceExecutor(); TopDocs[] perLeafResults = (sliceExecutor == null) ? sequentialSearch(reader.leaves(), filterWeight) - : parallelSearch(slices, filterWeight, sliceExecutor); + : parallelSearch(indexSearcher.getSlices(), filterWeight, sliceExecutor); // Merge sort the results TopDocs topK = TopDocs.merge(k, perLeafResults); @@ -111,20 +109,22 @@ private TopDocs[] sequentialSearch( } private TopDocs[] parallelSearch( - IndexSearcher.LeafSlice[] slices, Weight filterWeight, SliceExecutor sliceExecutor) { + IndexSearcher.LeafSlice[] slices, Weight filterWeight, SliceExecutor sliceExecutor) { List> tasks = new ArrayList<>(slices.length); int segmentsCount = 0; for (IndexSearcher.LeafSlice slice : slices) { segmentsCount += slice.leaves.length; - tasks.add(new FutureTask<>(() -> { - TopDocs[] results = new TopDocs[slice.leaves.length]; - int i = 0; - for (LeafReaderContext context : slice.leaves) { - results[i++] = searchLeaf(context, filterWeight); - } - return results; - })); + tasks.add( + new FutureTask<>( + () -> { + TopDocs[] results = new TopDocs[slice.leaves.length]; + int i = 0; + for (LeafReaderContext context : slice.leaves) { + results[i++] = searchLeaf(context, filterWeight); + } + return results; + })); } sliceExecutor.invokeAll(tasks); From ca2b8c65360ad9431a0f6fbd173c7ad7764e171b Mon Sep 17 00:00:00 2001 From: Luca Cavanna Date: Thu, 25 May 2023 20:31:41 +0200 Subject: [PATCH 3/4] add comment --- .../java/org/apache/lucene/search/AbstractKnnVectorQuery.java | 1 + 1 file changed, 1 insertion(+) diff --git a/lucene/core/src/java/org/apache/lucene/search/AbstractKnnVectorQuery.java b/lucene/core/src/java/org/apache/lucene/search/AbstractKnnVectorQuery.java index 98e8d227e624..7112725c3245 100644 --- a/lucene/core/src/java/org/apache/lucene/search/AbstractKnnVectorQuery.java +++ b/lucene/core/src/java/org/apache/lucene/search/AbstractKnnVectorQuery.java @@ -82,6 +82,7 @@ public Query rewrite(IndexSearcher indexSearcher) throws IOException { } SliceExecutor sliceExecutor = indexSearcher.getSliceExecutor(); + //in case of parallel execution, the leaf results are not ordered by leaf context's ordinal TopDocs[] perLeafResults = (sliceExecutor == null) ? sequentialSearch(reader.leaves(), filterWeight) From 224364e59f6a65e2aeb5aa06e85f14025190a319 Mon Sep 17 00:00:00 2001 From: Luca Cavanna Date: Thu, 25 May 2023 21:02:32 +0200 Subject: [PATCH 4/4] tidy --- .../java/org/apache/lucene/search/AbstractKnnVectorQuery.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lucene/core/src/java/org/apache/lucene/search/AbstractKnnVectorQuery.java b/lucene/core/src/java/org/apache/lucene/search/AbstractKnnVectorQuery.java index 7112725c3245..cd8d73b8c26f 100644 --- a/lucene/core/src/java/org/apache/lucene/search/AbstractKnnVectorQuery.java +++ b/lucene/core/src/java/org/apache/lucene/search/AbstractKnnVectorQuery.java @@ -82,7 +82,7 @@ public Query rewrite(IndexSearcher indexSearcher) throws IOException { } SliceExecutor sliceExecutor = indexSearcher.getSliceExecutor(); - //in case of parallel execution, the leaf results are not ordered by leaf context's ordinal + // in case of parallel execution, the leaf results are not ordered by leaf context's ordinal TopDocs[] perLeafResults = (sliceExecutor == null) ? sequentialSearch(reader.leaves(), filterWeight)