-
Notifications
You must be signed in to change notification settings - Fork 1.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Concurrent rewrite for KnnVectorQuery #12160
Changes from 3 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -19,9 +19,14 @@ | |
import static org.apache.lucene.search.DocIdSetIterator.NO_MORE_DOCS; | ||
|
||
import java.io.IOException; | ||
import java.io.UncheckedIOException; | ||
import java.util.Arrays; | ||
import java.util.Comparator; | ||
import java.util.List; | ||
import java.util.Objects; | ||
import java.util.concurrent.ExecutionException; | ||
import java.util.concurrent.Executor; | ||
import java.util.concurrent.FutureTask; | ||
import org.apache.lucene.codecs.KnnVectorsReader; | ||
import org.apache.lucene.index.FieldInfo; | ||
import org.apache.lucene.index.IndexReader; | ||
|
@@ -62,9 +67,8 @@ public AbstractKnnVectorQuery(String field, int k, Query filter) { | |
@Override | ||
public Query rewrite(IndexSearcher indexSearcher) throws IOException { | ||
IndexReader reader = indexSearcher.getIndexReader(); | ||
TopDocs[] perLeafResults = new TopDocs[reader.leaves().size()]; | ||
|
||
Weight filterWeight = null; | ||
final Weight filterWeight; | ||
if (filter != null) { | ||
BooleanQuery booleanQuery = | ||
new BooleanQuery.Builder() | ||
|
@@ -73,17 +77,48 @@ public Query rewrite(IndexSearcher indexSearcher) throws IOException { | |
.build(); | ||
Query rewritten = indexSearcher.rewrite(booleanQuery); | ||
filterWeight = indexSearcher.createWeight(rewritten, ScoreMode.COMPLETE_NO_SCORES, 1f); | ||
} else { | ||
filterWeight = null; | ||
} | ||
|
||
for (LeafReaderContext ctx : reader.leaves()) { | ||
TopDocs results = searchLeaf(ctx, filterWeight); | ||
if (ctx.docBase > 0) { | ||
for (ScoreDoc scoreDoc : results.scoreDocs) { | ||
scoreDoc.doc += ctx.docBase; | ||
} | ||
} | ||
perLeafResults[ctx.ord] = results; | ||
} | ||
List<FutureTask<TopDocs>> tasks = | ||
reader.leaves().stream() | ||
.map( | ||
ctx -> | ||
new FutureTask<>( | ||
() -> { | ||
try { | ||
TopDocs results = searchLeaf(ctx, filterWeight); | ||
if (ctx.docBase > 0) { | ||
for (ScoreDoc scoreDoc : results.scoreDocs) { | ||
scoreDoc.doc += ctx.docBase; | ||
} | ||
} | ||
return results; | ||
} catch (IOException e) { | ||
throw new UncheckedIOException(e); | ||
} | ||
})) | ||
.toList(); | ||
|
||
Executor executor = Objects.requireNonNullElse(indexSearcher.getExecutor(), Runnable::run); | ||
SliceExecutor sliceExecutor = new SliceExecutor(executor); | ||
sliceExecutor.invokeAll(tasks); | ||
|
||
TopDocs[] perLeafResults = | ||
tasks.stream() | ||
.map( | ||
task -> { | ||
try { | ||
return task.get(); | ||
} catch (ExecutionException e) { | ||
throw new RuntimeException(e.getCause()); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not confident it's safe to swallow the root exception and only report the cause? Would it work to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think any exception thrown during the thread's execution is (always?) wrapped in an I mainly used
However, I don't have any strong opinions on this and can write a function to check for three nested exceptions as well. Please let me know what you feel, and I'll update it accordingly There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thank for pointing to that javadoc, it makes sense to me. Maybe we should consider doing the same in IndexSearcher where we also catch ExecutionException (in a follow-up). |
||
} catch (InterruptedException e) { | ||
throw new RuntimeException(e); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you throw There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Makes sense! This is more suitable |
||
} | ||
}) | ||
.toArray(TopDocs[]::new); | ||
|
||
// Merge sort the results | ||
TopDocs topK = TopDocs.merge(k, perLeafResults); | ||
if (topK.scoreDocs.length == 0) { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -210,7 +210,10 @@ public void testDimensionMismatch() throws IOException { | |
IndexSearcher searcher = newSearcher(reader); | ||
AbstractKnnVectorQuery kvq = getKnnVectorQuery("field", new float[] {0}, 10); | ||
IllegalArgumentException e = | ||
expectThrows(IllegalArgumentException.class, () -> searcher.search(kvq, 10)); | ||
expectThrows( | ||
RuntimeException.class, | ||
IllegalArgumentException.class, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Since IAE extends RuntimeExeption, it should be good to just do There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I wanted to preserve the original functionality of the testcase: Checking for illegal arguments There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thank you, that makes sense, I had made a wrong assumption about what expectThrows does with two exception types! |
||
() -> searcher.search(kvq, 10)); | ||
assertEquals("vector query dimension: 1 differs from field dimension: 2", e.getMessage()); | ||
} | ||
} | ||
|
@@ -495,6 +498,7 @@ public void testRandomWithFilter() throws IOException { | |
assertEquals(9, results.totalHits.value); | ||
assertEquals(results.totalHits.value, results.scoreDocs.length); | ||
expectThrows( | ||
RuntimeException.class, | ||
UnsupportedOperationException.class, | ||
() -> | ||
searcher.search( | ||
|
@@ -509,6 +513,7 @@ public void testRandomWithFilter() throws IOException { | |
assertEquals(5, results.totalHits.value); | ||
assertEquals(results.totalHits.value, results.scoreDocs.length); | ||
expectThrows( | ||
RuntimeException.class, | ||
UnsupportedOperationException.class, | ||
() -> | ||
searcher.search( | ||
|
@@ -536,6 +541,7 @@ public void testRandomWithFilter() throws IOException { | |
// Test a filter that exhausts visitedLimit in upper levels, and switches to exact search | ||
Query filter4 = IntPoint.newRangeQuery("tag", lower, lower + 2); | ||
expectThrows( | ||
RuntimeException.class, | ||
UnsupportedOperationException.class, | ||
() -> | ||
searcher.search( | ||
|
@@ -708,6 +714,7 @@ public void testBitSetQuery() throws IOException { | |
|
||
Query filter = new ThrowingBitSetQuery(new FixedBitSet(numDocs)); | ||
expectThrows( | ||
RuntimeException.class, | ||
UnsupportedOperationException.class, | ||
() -> | ||
searcher.search( | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I really like this change, but it seems to be we have a simple optimization opportunity here.
We shouldn't bother with any parallelism if
indexSearcher.getExecutor() == null || reader.leaves().size() <= 1
. Its a simpleif
branch that allows us to remove all the overhead associated with parallel rewrites when no parallelism can be achieved.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if this is right. Thinking out loud, I would assume that users who leverage IndexSearcher concurrency generally have two thread pools, one that they pass to the IndexSearcher constructor that they expect to do the heavy work, and another one, which is the one where
IndexSearcher#search
is called, that mostly handles coordination and lightweight work such as merging top hits coming from different shards but generally spends most of its time waiting for work to complete in the other threadpool. Your optimization suggestion boils dow to running some heavy work (a vector search) in the coordinating threadpool when there is a single segment. If heavy work may happen in either threadpool, this makes sizing these threadpools complicated, as either you allocatenum_cores
threads to the threadpool that does heavy work but then you may end up with more thannum_cores
threads doing heavy work because some heavy work also happens in the coordinating threadpool., or you allocate less thannum_cores
threads but then you might not use all your hardware?That said, your suggestion aligns with how IndexSearcher currently works, so maybe we should apply it for now and discuss in a follow-up issue whether we should also delegate to the executor when there is a single segment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am fine with that.
I think also that we could only check
indexSearcher.getExecutor() == null
instead of making a decision for the caller regarding the number of leaves.So, I would say for now only check if
indexSearcher.getExecutor() == null
and if it is, do it the old way.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@kaivalnp could you (instead of using
Runnable::run
) just do the regular loop as it was previously ifindexSearcher.getExecutor() == null
?If
getExecutor()
is not null, we should assume the caller wants it used. @jpountz is correct there.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would totally agree with you here! We shouldn't add overhead for non-concurrent executions
If I understand correctly, you are suggesting to add an
if
block with the condition:true
, we want to perform search as beforeelse
we perform the search as per this PRI have tried to implement the changes here. I ran some benchmarks for these (with the executor as
null
):enwiki (topK = 100, segment count = 10, executor = null)
enwiki (topK = 100, segment count = 5, executor = null)
enwiki (topK = 100, segment count = 1, executor = null)
There are a few places where it gives some speedup, but this seems to be too low (Note that there is also some logic duplication here and here, which we would want to avoid, maybe by wrapping it in a callable. I tried that out locally and it was performing similar to worse)
In the absence of an executor, we are setting it to
Runnable::run
, which performs the same tasks sequentially. My guess would be that its overhead is much lower compared to the search tasks, and IMO the readability earlier outweighs the separateif
blockPlease let me know what you feel / if you had something else in mind?
Edit: Sorry, links in this comment are now broken because they pointed to specific lines at the time of writing. Now that the underlying branch is updated, links point to unrelated places
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Duplication of logic right next to each other is fine (IMO). I would keep it simple and duplicate those 4 lines.
I would also change the if statement to only be
if(executor == null)
.I think the minor
if
statement is worth it. It creates fewer objects and is a simpler function. It might be more readable if you broke the results gathering into individual private methods.TopDocs[] gatherPerLeafResults(List<LeafReaderContext>,Weight)
TopDocs[] gatherPerLeafResults(List<LeafReaderContext>,Weight,Executor)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For the logic duplication, it just updated the doc ids (by adding
ctx.docBase
to get index-level doc ids): and I put it in a separate functionHere are the sample changes, please let me know if these look good: and I'll commit it in this PR
Note that I had to wrap the sequential execution in a
try - catch
, and wrap exceptions in aRuntimeException
for consistency with exceptions thrown during parallel execution (also to pass test cases)There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wouldn't worry about that. That makes things even more difficult to reason about. I would much rather have a method that takes in the filter weight and leaf contexts and one that takes the same parameters but with an added Executor.
One called where
indexSearcher.getExecutor() == null
and the other when the executor is provided.Two methods like this:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the input!
This was really helpful in reducing overhead for non-concurrent search, and improving readability!