Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Concurrent rewrite for KnnVectorQuery #12160

Merged
merged 5 commits into from
Mar 4, 2023
Merged

Concurrent rewrite for KnnVectorQuery #12160

merged 5 commits into from
Mar 4, 2023

Conversation

kaivalnp
Copy link
Contributor

Description

Issue #11862

Solution

AbstractKnnVectorQuery currently performs HNSW searches (one per-segment) iteratively
Since this is done in a single thread, we can make it concurrent by spinning off per-segment searches to different threads (and make use of available processors)

The actual search is performed in Query#rewrite, and support to allow concurrency there was added recently (#11838) by passing an IndexSearcher (which wraps an IndexReader and Executor)

Proposing to achieve this by CompletableFuture:

  • If the Executor is not set, we can perform a blocking call to CompletableFuture#completedFuture
  • Else we submit the task of per-segment search to the Executor using CompletableFuture#supplyAsync

@kaivalnp
Copy link
Contributor Author

As a follow up here: We can also dive into breaking the per-segment search into sub-tasks, and make use of work-stealing thread pools (if one thread completes search in a smaller graph, it can "steal" some work from another thread that is performing a larger search by picking up some of it's sub-tasks)

One thing to note here would be that sub-tasks must be large enough so that overhead from sub-task creation, thread switching, etc. does not cause performance drops instead

Any suggestions?

Copy link
Contributor

@zhaih zhaih left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for working on it!
The change looks good, have you tried benchmark it to see how much speed up we can get? (Using luceneutil or whatever benchmark you have)

if (executor == null) {
return CompletableFuture.completedFuture(supplier.get());
} else {
return CompletableFuture.supplyAsync(supplier, executor);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In IndexSearcher we're using SliceExecutor to make sure the main thread is also doing some work but not only wait for joining.
I think we can replicate the same logic here? (Since KNN search is likely to be slow so probably the main thread should do some work as well?)

Maybe we can just use the SliceExecutor from IndexSearcher so that it might also kind of solving the load balancing problem?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the input! This was helpful in reducing latency from thread switching further

@kaivalnp
Copy link
Contributor Author

I ran some benchmarks as well (Columns 2-4 are latency in ms):

enwiki (topK = 100, segment count = 10)

recall Sequential CompletableFuture SliceExecutor nDoc fanout maxConn beamWidth
0.995 1.00 0.31 0.32 10000 0 16 32
0.998 1.33 0.37 0.38 10000 50 16 32
0.998 1.09 0.31 0.32 10000 0 16 64
0.999 1.43 0.37 0.40 10000 50 16 64
0.995 0.99 0.29 0.31 10000 0 32 32
0.998 1.31 0.35 0.37 10000 50 32 32
0.998 1.05 0.29 0.31 10000 0 32 64
0.999 1.40 0.36 0.38 10000 50 32 64
0.987 1.73 0.39 0.41 100000 0 16 32
0.992 2.37 0.49 0.52 100000 50 16 32
0.993 1.92 0.41 0.44 100000 0 16 64
0.996 2.57 0.53 0.56 100000 50 16 64
0.987 1.74 0.39 0.41 100000 0 32 32
0.992 2.34 0.50 0.52 100000 50 32 32
0.994 1.98 0.42 0.45 100000 0 32 64
0.997 2.68 0.54 0.57 100000 50 32 64
0.971 2.76 0.56 0.46 1000000 0 16 32
0.982 3.75 0.72 0.60 1000000 50 16 32
0.985 3.26 0.61 0.55 1000000 0 16 64
0.991 4.24 0.80 0.64 1000000 50 16 64
0.973 2.80 0.58 0.51 1000000 0 32 32
0.983 3.88 0.75 0.62 1000000 50 32 32
0.986 3.33 0.65 0.57 1000000 0 32 64
0.992 4.57 0.85 0.70 1000000 50 32 64

enwiki (topK = 100, segment count = 5)

recall Sequential CompletableFuture SliceExecutor nDoc fanout maxConn beamWidth
0.991 0.59 0.28 0.23 10000 0 16 32
0.996 0.84 0.34 0.27 10000 50 16 32
0.997 0.62 0.28 0.23 10000 0 16 64
0.999 0.89 0.35 0.31 10000 50 16 64
0.991 0.60 0.26 0.21 10000 0 32 32
0.995 0.81 0.35 0.26 10000 50 32 32
0.997 0.65 0.27 0.22 10000 0 32 64
0.999 0.86 0.36 0.29 10000 50 32 64
0.978 0.97 0.36 0.29 100000 0 16 32
0.987 1.29 0.47 0.38 100000 50 16 32
0.989 1.08 0.40 0.31 100000 0 16 64
0.994 1.46 0.52 0.41 100000 50 16 64
0.977 0.98 0.36 0.31 100000 0 32 32
0.987 1.32 0.49 0.37 100000 50 32 32
0.989 1.14 0.40 0.34 100000 0 32 64
0.994 1.55 0.56 0.42 100000 50 32 64
0.957 1.54 0.57 0.45 1000000 0 16 32
0.972 2.07 0.70 0.60 1000000 50 16 32
0.976 1.71 0.61 0.53 1000000 0 16 64
0.985 2.33 0.76 0.65 1000000 50 16 64
0.959 1.58 0.57 0.45 1000000 0 32 32
0.974 2.17 0.73 0.63 1000000 50 32 32
0.978 1.88 0.65 0.53 1000000 0 32 64
0.987 2.57 0.83 0.69 1000000 50 32 64

enwiki (topK = 100, segment count = 1)

recall Sequential CompletableFuture SliceExecutor nDoc fanout maxConn beamWidth
0.941 0.23 0.25 0.21 10000 0 16 32
0.970 0.24 0.27 0.25 10000 50 16 32
0.965 0.19 0.23 0.20 10000 0 16 64
0.984 0.27 0.28 0.28 10000 50 16 64
0.941 0.17 0.20 0.17 10000 0 32 32
0.970 0.23 0.27 0.24 10000 50 32 32
0.966 0.20 0.22 0.20 10000 0 32 64
0.985 0.26 0.29 0.28 10000 50 32 64
0.909 0.26 0.29 0.28 100000 0 16 32
0.945 0.36 0.38 0.38 100000 50 16 32
0.944 0.30 0.32 0.31 100000 0 16 64
0.969 0.42 0.43 0.43 100000 50 16 64
0.914 0.28 0.30 0.29 100000 0 32 32
0.948 0.37 0.43 0.44 100000 50 32 32
0.949 0.31 0.35 0.31 100000 0 32 64
0.972 0.41 0.48 0.43 100000 50 32 64
0.870 0.35 0.38 0.36 1000000 0 16 32
0.911 0.48 0.51 0.50 1000000 50 16 32
0.913 0.40 0.43 0.42 1000000 0 16 64
0.945 0.55 0.59 0.57 1000000 50 16 64
0.881 0.38 0.43 0.39 1000000 0 32 32
0.919 0.52 0.57 0.55 1000000 50 32 32
0.923 0.45 0.50 0.47 1000000 0 32 64
0.954 0.62 0.67 0.65 1000000 50 32 64

I used KnnGraphTester for this benchmark with two changes:

  • Used elapsed time instead of totalCpuTime (which measures time for which the main thread was running). This is important because the main thread may be "waiting" on executor threads to finish, and not count towards overall latency
  • Added an executor: Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()) to the IndexSearcher object, to "make use" of concurrency

Please let me know if this sounds fine?

As expected, the speedup is higher for larger number of segments since they are taken up in parallel. I have also tried to measure the overhead of searching from a single segment

@kaivalnp kaivalnp requested a review from zhaih February 25, 2023 19:19
Copy link
Contributor

@zhaih zhaih left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! The benchmark result looks fantastic!

Copy link
Contributor

@zhaih zhaih left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah forgot to say, please add an CHANGES.txt entry for the change :)

@kaivalnp
Copy link
Contributor Author

Thanks! I wasn't sure where to add the entry..

I have currently put it under Lucene 10 -> Optimizations
Please let me know if you feel this should be someplace else

@kaivalnp kaivalnp requested a review from zhaih February 26, 2023 10:33
Copy link
Contributor

@zhaih zhaih left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good, thank you! I'll wait for another one or two days before merging it in case others want to chime in

Copy link
Contributor

@jpountz jpountz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I left some minor comments but the change looks good.

} catch (ExecutionException e) {
throw new RuntimeException(e.getCause());
} catch (InterruptedException e) {
throw new RuntimeException(e);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you throw ThreadInterruptedException instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense! This is more suitable

try {
return task.get();
} catch (ExecutionException e) {
throw new RuntimeException(e.getCause());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not confident it's safe to swallow the root exception and only report the cause? Would it work to throw new RuntimeException(e)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think any exception thrown during the thread's execution is (always?) wrapped in an ExecutionException

I mainly used getCause for two reasons:

  • We would always have to throw a RuntimeException(ExecutionException(actual Throwable)), and the ExecutionException might be redundant there
  • LuceneTestCase (currently) allows checking at most two wrapped exceptions, and the one above would have three

However, I don't have any strong opinions on this and can write a function to check for three nested exceptions as well. Please let me know what you feel, and I'll update it accordingly

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank for pointing to that javadoc, it makes sense to me. Maybe we should consider doing the same in IndexSearcher where we also catch ExecutionException (in a follow-up).

expectThrows(IllegalArgumentException.class, () -> searcher.search(kvq, 10));
expectThrows(
RuntimeException.class,
IllegalArgumentException.class,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since IAE extends RuntimeExeption, it should be good to just do expectThrows(RuntimeException.class, runnable)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wanted to preserve the original functionality of the testcase: Checking for illegal arguments
If we only check for the outer class, it may be possible that some other exception was thrown inside (maybe RuntimeException(NullPointerException)), but the test still passed?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you, that makes sense, I had made a wrong assumption about what expectThrows does with two exception types!

Copy link
Member

@benwtrent benwtrent left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One minor comment. We should allow the simple sequential branch to execute when parallelism isn't possible. Keeps us from creating unnecessary objects for clean up and will remove all segment == 1 overhead.

But, this change is nice, and the performance improvements are marvelous.

Comment on lines 84 to 106
List<FutureTask<TopDocs>> tasks =
reader.leaves().stream()
.map(
ctx ->
new FutureTask<>(
() -> {
try {
TopDocs results = searchLeaf(ctx, filterWeight);
if (ctx.docBase > 0) {
for (ScoreDoc scoreDoc : results.scoreDocs) {
scoreDoc.doc += ctx.docBase;
}
}
return results;
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}))
.toList();

Executor executor = Objects.requireNonNullElse(indexSearcher.getExecutor(), Runnable::run);
SliceExecutor sliceExecutor = new SliceExecutor(executor);
sliceExecutor.invokeAll(tasks);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I really like this change, but it seems to be we have a simple optimization opportunity here.

We shouldn't bother with any parallelism if indexSearcher.getExecutor() == null || reader.leaves().size() <= 1. Its a simple if branch that allows us to remove all the overhead associated with parallel rewrites when no parallelism can be achieved.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if this is right. Thinking out loud, I would assume that users who leverage IndexSearcher concurrency generally have two thread pools, one that they pass to the IndexSearcher constructor that they expect to do the heavy work, and another one, which is the one where IndexSearcher#search is called, that mostly handles coordination and lightweight work such as merging top hits coming from different shards but generally spends most of its time waiting for work to complete in the other threadpool. Your optimization suggestion boils dow to running some heavy work (a vector search) in the coordinating threadpool when there is a single segment. If heavy work may happen in either threadpool, this makes sizing these threadpools complicated, as either you allocate num_cores threads to the threadpool that does heavy work but then you may end up with more than num_cores threads doing heavy work because some heavy work also happens in the coordinating threadpool., or you allocate less than num_cores threads but then you might not use all your hardware?

That said, your suggestion aligns with how IndexSearcher currently works, so maybe we should apply it for now and discuss in a follow-up issue whether we should also delegate to the executor when there is a single segment.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That said, your suggestion aligns with how IndexSearcher currently works, so maybe we should apply it for now and discuss in a follow-up issue whether we should also delegate to the executor when there is a single segment.

I am fine with that.

I think also that we could only check indexSearcher.getExecutor() == null instead of making a decision for the caller regarding the number of leaves.

So, I would say for now only check if indexSearcher.getExecutor() == null and if it is, do it the old way.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kaivalnp could you (instead of using Runnable::run) just do the regular loop as it was previously if indexSearcher.getExecutor() == null?

If getExecutor() is not null, we should assume the caller wants it used. @jpountz is correct there.

Copy link
Contributor Author

@kaivalnp kaivalnp Feb 28, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We shouldn't bother with any parallelism if indexSearcher.getExecutor() == null || reader.leaves().size() <= 1. Its a simple if branch that allows us to remove all the overhead associated with parallel rewrites when no parallelism can be achieved.

I would totally agree with you here! We shouldn't add overhead for non-concurrent executions
If I understand correctly, you are suggesting to add an if block with the condition:

I have tried to implement the changes here. I ran some benchmarks for these (with the executor as null):

enwiki (topK = 100, segment count = 10, executor = null)

recall Sequential SliceExecutor ReduceOverhead nDoc fanout maxConn beamWidth
0.995 0.95 0.96 0.95 10000 0 16 32
0.998 1.26 1.30 1.29 10000 50 16 32
0.998 1.05 1.07 1.07 10000 0 16 64
0.999 1.41 1.43 1.43 10000 50 16 64
0.995 0.98 0.99 0.98 10000 0 32 32
0.998 1.31 1.33 1.34 10000 50 32 32
0.998 0.99 1.01 1.01 10000 0 32 64
0.999 1.33 1.36 1.36 10000 50 32 64
0.987 1.70 1.70 1.71 100000 0 16 32
0.992 2.30 2.30 2.31 100000 50 16 32
0.993 1.92 1.89 1.94 100000 0 16 64
0.996 2.63 2.65 2.64 100000 50 16 64
0.987 1.73 1.70 1.74 100000 0 32 32
0.992 2.34 2.30 2.37 100000 50 32 32
0.994 1.96 1.92 1.98 100000 0 32 64
0.997 2.66 2.61 2.69 100000 50 32 64
0.971 2.72 2.70 2.74 1000000 0 16 32
0.982 3.77 3.79 3.78 1000000 50 16 32
0.985 3.13 3.19 3.19 1000000 0 16 64
0.991 4.34 4.37 4.36 1000000 50 16 64
0.973 2.86 2.94 2.94 1000000 0 32 32
0.983 3.94 3.98 3.97 1000000 50 32 32
0.986 3.38 3.37 3.38 1000000 0 32 64
0.992 4.63 4.66 4.67 1000000 50 32 64

enwiki (topK = 100, segment count = 5, executor = null)

recall Sequential SliceExecutor ReduceOverhead nDoc fanout maxConn beamWidth
0.991 0.59 0.61 0.59 10000 0 16 32
0.996 0.82 0.83 0.81 10000 50 16 32
0.997 0.61 0.62 0.60 10000 0 16 64
0.999 0.88 0.88 0.86 10000 50 16 64
0.991 0.59 0.59 0.58 10000 0 32 32
0.995 0.80 0.81 0.80 10000 50 32 32
0.997 0.64 0.64 0.62 10000 0 32 64
0.999 0.87 0.88 0.89 10000 50 32 64
0.978 1.09 1.08 1.08 100000 0 16 32
0.987 1.29 1.32 1.34 100000 50 16 32
0.989 1.10 1.09 1.10 100000 0 16 64
0.994 1.48 1.49 1.46 100000 50 16 64
0.977 0.98 0.99 0.98 100000 0 32 32
0.987 1.33 1.35 1.34 100000 50 32 32
0.989 1.13 1.14 1.13 100000 0 32 64
0.994 1.55 1.55 1.53 100000 50 32 64
0.957 1.48 1.52 1.49 1000000 0 16 32
0.972 2.03 2.08 2.04 1000000 50 16 32
0.976 1.70 1.73 1.71 1000000 0 16 64
0.985 2.42 2.45 2.47 1000000 50 16 64
0.959 1.67 1.65 1.66 1000000 0 32 32
0.974 2.13 2.15 2.16 1000000 50 32 32
0.978 1.89 1.84 1.89 1000000 0 32 64
0.987 2.52 2.53 2.55 1000000 50 32 64

enwiki (topK = 100, segment count = 1, executor = null)

recall Sequential SliceExecutor ReduceOverhead nDoc fanout maxConn beamWidth
0.941 0.22 0.21 0.24 10000 0 16 32
0.970 0.24 0.24 0.25 10000 50 16 32
0.965 0.20 0.19 0.20 10000 0 16 64
0.984 0.28 0.27 0.28 10000 50 16 64
0.941 0.18 0.17 0.18 10000 0 32 32
0.970 0.24 0.23 0.23 10000 50 32 32
0.966 0.20 0.20 0.20 10000 0 32 64
0.985 0.28 0.27 0.26 10000 50 32 64
0.909 0.27 0.27 0.27 100000 0 16 32
0.945 0.38 0.36 0.37 100000 50 16 32
0.944 0.32 0.30 0.30 100000 0 16 64
0.969 0.43 0.41 0.42 100000 50 16 64
0.914 0.28 0.28 0.29 100000 0 32 32
0.948 0.39 0.38 0.38 100000 50 32 32
0.949 0.30 0.30 0.32 100000 0 32 64
0.972 0.44 0.41 0.40 100000 50 32 64
0.870 0.35 0.34 0.35 1000000 0 16 32
0.911 0.49 0.48 0.47 1000000 50 16 32
0.913 0.40 0.40 0.41 1000000 0 16 64
0.945 0.55 0.55 0.56 1000000 50 16 64
0.881 0.38 0.39 0.38 1000000 0 32 32
0.919 0.52 0.52 0.52 1000000 50 32 32
0.923 0.45 0.45 0.46 1000000 0 32 64
0.954 0.62 0.62 0.61 1000000 50 32 64

There are a few places where it gives some speedup, but this seems to be too low (Note that there is also some logic duplication here and here, which we would want to avoid, maybe by wrapping it in a callable. I tried that out locally and it was performing similar to worse)

In the absence of an executor, we are setting it to Runnable::run, which performs the same tasks sequentially. My guess would be that its overhead is much lower compared to the search tasks, and IMO the readability earlier outweighs the separate if block

Please let me know what you feel / if you had something else in mind?

Edit: Sorry, links in this comment are now broken because they pointed to specific lines at the time of writing. Now that the underlying branch is updated, links point to unrelated places

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that there is also some logic duplication

Duplication of logic right next to each other is fine (IMO). I would keep it simple and duplicate those 4 lines.

I would also change the if statement to only be if(executor == null).

I think the minor if statement is worth it. It creates fewer objects and is a simpler function. It might be more readable if you broke the results gathering into individual private methods.

TopDocs[] gatherPerLeafResults(List<LeafReaderContext>,Weight)

TopDocs[] gatherPerLeafResults(List<LeafReaderContext>,Weight,Executor)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the logic duplication, it just updated the doc ids (by adding ctx.docBase to get index-level doc ids): and I put it in a separate function

I think the minor if statement is worth it. It creates fewer objects and is a simpler function. It might be more readable if you broke the results gathering into individual private methods.

Here are the sample changes, please let me know if these look good: and I'll commit it in this PR

Note that I had to wrap the sequential execution in a try - catch, and wrap exceptions in a RuntimeException for consistency with exceptions thrown during parallel execution (also to pass test cases)

Copy link
Member

@benwtrent benwtrent Mar 1, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the logic duplication

I wouldn't worry about that. That makes things even more difficult to reason about. I would much rather have a method that takes in the filter weight and leaf contexts and one that takes the same parameters but with an added Executor.

One called where indexSearcher.getExecutor() == null and the other when the executor is provided.
Two methods like this:

  private TopDocs[] gatherLeafResults(
      List<LeafReaderContext> leafReaderContexts, Weight filterWeight) throws IOException {
    TopDocs[] perLeafResults = new TopDocs[leafReaderContexts.size()];
    for (LeafReaderContext ctx : leafReaderContexts) {
      TopDocs results = searchLeaf(ctx, filterWeight);
      if (ctx.docBase > 0) {
        for (ScoreDoc scoreDoc : results.scoreDocs) {
          scoreDoc.doc += ctx.docBase;
        }
      }
      perLeafResults[ctx.ord] = results;
    }
    return perLeafResults;
  }

  private TopDocs[] gatherLeafResults(
      List<LeafReaderContext> leafReaderContexts, Weight filterWeight, Executor executor) {
    List<FutureTask<TopDocs>> tasks =
        leafReaderContexts.stream()
            .map(
                ctx ->
                    new FutureTask<>(
                        () -> {
                          TopDocs results = searchLeaf(ctx, filterWeight);
                          if (ctx.docBase > 0) {
                            for (ScoreDoc scoreDoc : results.scoreDocs) {
                              scoreDoc.doc += ctx.docBase;
                            }
                          }
                          return results;
                        }))
            .toList();

    SliceExecutor sliceExecutor = new SliceExecutor(executor);
    sliceExecutor.invokeAll(tasks);

    return tasks.stream()
        .map(
            task -> {
              try {
                return task.get();
              } catch (ExecutionException e) {
                throw new RuntimeException(e.getCause());
              } catch (InterruptedException e) {
                throw new ThreadInterruptedException(e);
              }
            })
        .toArray(TopDocs[]::new);
  }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the input!
This was really helpful in reducing overhead for non-concurrent search, and improving readability!

Kaival Parikh added 2 commits February 28, 2023 21:26
- Throw more suitable ThreadInterruptedException
- Let FutureTask handle IOException gracefully
- Reduce overhead of non-concurrent search by preserving original execution
- Improve readability by factoring into separate functions
Copy link
Member

@benwtrent benwtrent left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I dig it!

@benwtrent benwtrent self-assigned this Mar 2, 2023
@benwtrent
Copy link
Member

I foresee backporting this to 9.6. Unless @kaivalnp @jpountz or @zhaih object.

@jpountz
Copy link
Contributor

jpountz commented Mar 2, 2023

+1 to backporting

@zhaih
Copy link
Contributor

zhaih commented Mar 2, 2023

@benwtrent I think the concurrent rewrite API change is only at Lucene10 right now, it is a bit tricky to backport it to 9x and not yet done as discussed in the previous PR I linked.

@benwtrent
Copy link
Member

Thank you for the context @zhaih, Do you mind then guiding this PR through the merging and potential backporting process?

Backporting would indeed be blocked until your change can be backported at some point.

@zhaih
Copy link
Contributor

zhaih commented Mar 2, 2023

Do you mind then guiding this PR through the merging and potential backporting process?

Yeah I can help with that, I would suggest let's keep it in Lucene 10 for now, and if the previous PR is able to be backported to 9x in the future, I'll remember to backport this PR as well (along with the CHANGES.txt entry moving to appropriate place)

@zhaih zhaih merged commit e0d92ee into apache:main Mar 4, 2023
@mkhludnev
Copy link
Member

May this failure relevant to this
https://jenkins.thetaphi.de/job/Lucene-9.x-Linux/8927/testReport/junit/org.apache.lucene.search/TestKnnFloatVectorQuery/testDocAndScoreQueryBasics/

java.lang.IllegalStateException: This DocAndScore query was created by a different reader
	at __randomizedtesting.SeedInfo.seed([125C1E79EB05AAC5:3BA19DF38099A68]:0)
	at org.apache.lucene.search.AbstractKnnVectorQuery$DocAndScoreQuery.createWeight(AbstractKnnVectorQuery.java:298)
	at org.apache.lucene.search.TestKnnFloatVectorQuery.testDocAndScoreQueryBasics(TestKnnFloatVectorQuery.java:204)

??

@kaivalnp
Copy link
Contributor Author

kaivalnp commented Mar 4, 2023

May this failure relevant to this
https://jenkins.thetaphi.de/job/Lucene-9.x-Linux/8927/testReport/junit/org.apache.lucene.search/TestKnnFloatVectorQuery/testDocAndScoreQueryBasics/

I don't think so: The build seems to be for branch_9x (link to job information), which does not have this change
The Revision of the build also shows the latest commit on that branch, where this commit is not present

Please let me know if I'm missing something?

Edit: I see an issue (#12181) for this failure and PR (#12182) to fix it

@kaivalnp kaivalnp deleted the concurrent-knn branch March 6, 2023 05:37
benwtrent pushed a commit to benwtrent/lucene that referenced this pull request May 12, 2023
- Reduce overhead of non-concurrent search by preserving original execution
- Improve readability by factoring into separate functions

---------

Co-authored-by: Kaival Parikh <kaivalp2000@gmail.com>
benwtrent added a commit that referenced this pull request May 12, 2023
* Concurrent rewrite for KnnVectorQuery (#12160)


- Reduce overhead of non-concurrent search by preserving original execution
- Improve readability by factoring into separate functions

---------

Co-authored-by: Kaival Parikh <kaivalp2000@gmail.com>

* adjusting for backport

---------

Co-authored-by: Kaival Parikh <46070017+kaivalnp@users.noreply.github.com>
Co-authored-by: Kaival Parikh <kaivalp2000@gmail.com>
@javanna
Copy link
Contributor

javanna commented May 23, 2023

Question on this change: index searcher parallelizes search over leaf slices, while the concurrent rewrite for knn vector query will request one thread per segment. Is this on purpose or should the concurrent rewrite also leverage the slices for its parallelism?

@zhaih
Copy link
Contributor

zhaih commented May 23, 2023 via email

javanna added a commit to javanna/lucene that referenced this pull request May 24, 2023
The concurrent query rewrite for knn vectory query introduced with apache#12160
requests one thread per segment to the executor. To align this with the
IndexSearcher parallel behaviour, we should rather parallelize across
slices. Also, we can reuse the same slice executor instance that the
index searcher already holds, in that way we are using a
QueueSizeBasedExecutor when a thread pool executor is provided.
@javanna
Copy link
Contributor

javanna commented May 24, 2023

I opened #12325 .

javanna added a commit that referenced this pull request May 26, 2023
)

The concurrent query rewrite for knn vectory query introduced with #12160
requests one thread per segment to the executor. To align this with the
IndexSearcher parallel behaviour, we should rather parallelize across
slices. Also, we can reuse the same slice executor instance that the
index searcher already holds, in that way we are using a
QueueSizeBasedExecutor when a thread pool executor is provided.
javanna added a commit that referenced this pull request May 26, 2023
)

The concurrent query rewrite for knn vectory query introduced with #12160
requests one thread per segment to the executor. To align this with the
IndexSearcher parallel behaviour, we should rather parallelize across
slices. Also, we can reuse the same slice executor instance that the
index searcher already holds, in that way we are using a
QueueSizeBasedExecutor when a thread pool executor is provided.
javanna added a commit that referenced this pull request May 26, 2023
Moved entry for #12160 to 9.7.0 as it's been backported.
Added missing entry for #12325.
@javanna javanna added this to the 9.7.0 milestone May 26, 2023
javanna added a commit to elastic/elasticsearch that referenced this pull request May 31, 2023
Most relevant changes:

- add api to allow concurrent query rewrite (GITHUB-11838 Add api to allow concurrent query rewrite apache/lucene#11840)
- knn query rewrite (Concurrent rewrite for KnnVectorQuery apache/lucene#12160)
- Integrate the incubating Panama Vector API (Integrate the Incubating Panama Vector API  apache/lucene#12311)

As part of this commit I moved the ES codebase off of overriding or relying on the deprecated rewrite(IndexReader) method in favour of using rewrite(IndexSearcher) instead. For score functions, I went for not breaking existing plugins and create a new IndexSearcher whenever we rewrite a filter, otherwise we'd need to change the ScoreFunction#rewrite signature to take a searcher instead of a reader.

Co-authored-by: ChrisHegarty <christopher.hegarty@elastic.co>
hiteshk25 pushed a commit to cowpaths/lucene that referenced this pull request Jul 18, 2023
…dc8ca633e8bcf`) (#20)

* Add next minor version 9.7.0

* Fix SynonymQuery equals implementation (apache#12260)

The term member of TermAndBoost used to be a Term instance and became a
BytesRef with apache#11941, which means its equals impl won't take the field
name into account. The SynonymQuery equals impl needs to be updated
accordingly to take the field into account as well, otherwise synonym
queries with same term and boost across different fields are equal which
is a bug.

* Fix MMapDirectory documentation for Java 20 (apache#12265)

* Don't generate stacktrace in CollectionTerminatedException (apache#12270)

CollectionTerminatedException is always caught and never exposed to users so there's no point in filling
in a stack-trace for it.

* add missing changelog entry for apache#12260

* Add missing author to changelog entry for apache#12220

* Make query timeout members final in ExitableDirectoryReader (apache#12274)

There's a couple of places in the Exitable wrapper classes where
queryTimeout is set within the constructor and never modified. This
commit makes such members final.

* Update javadocs for QueryTimeout (apache#12272)

QueryTimeout was introduced together with ExitableDirectoryReader but is
now also optionally set to the IndexSearcher to wrap the bulk scorer
with a TimeLimitingBulkScorer. Its javadocs needs updating.

* Make TimeExceededException members final (apache#12271)

TimeExceededException has three members that are set within its constructor and never modified. They can be made final.

* DOAP changes for release 9.6.0

* Add back-compat indices for 9.6.0

* `ToParentBlockJoinQuery` Explain Support Score Mode (apache#12245) (apache#12283)

* `ToParentBlockJoinQuery` Explain Support Score Mode

---------

Co-authored-by: Marcus <marcuseagan@gmail.com>

* Simplify SliceExecutor and QueueSizeBasedExecutor (apache#12285)

The only behaviour that QueueSizeBasedExecutor overrides from SliceExecutor is when to execute on the caller thread. There is no need to override the whole invokeAll method for that. Instead, this commit introduces a shouldExecuteOnCallerThread method that can be overridden.

* [Backport] GITHUB-11838 Add api to allow concurrent query rewrite (apache#12197)

* GITHUB-11838 Change API to allow concurrent query rewrite (apache#11840)

Replace Query#rewrite(IndexReader) with Query#rewrite(IndexSearcher)

Co-authored-by: Patrick Zhai <zhaih@users.noreply.github.com>
Co-authored-by: Adrien Grand <jpountz@gmail.com>

Backport of apache#11840

Changes from original:
 - Query keeps `rewrite(IndexReader)`, but it is now deprecated
 - VirtualMethod is used to correct delegate to the overridden methods
 - The changes to `RewriteMethod` type classes are reverted, this increased the backwards compatibility impact. 

------------------------------

### Description
Issue: apache#11838 

#### Updated Proposal
 * Change signature of rewrite to `rewrite(IndexSearcher)`
 * How did I migrate the usage:
   * Use Intellij to do preliminary refactoring for me
   * For test usage, use searcher whenever is available, otherwise create one using `newSearcher(reader)`
   * For very few non-test classes which doesn't have IndexSearcher available but called rewrite, create a searcher using `new IndexSearcher(reader)`, tried my best to avoid creating it recurrently (Especially in `FieldQuery`)
   * For queries who have implemented the rewrite and uses some part of reader's functionality, use shortcut method when possible, otherwise pull out the reader from indexSearcher.

* Backport: Concurrent rewrite for KnnVectorQuery (apache#12160) (apache#12288)

* Concurrent rewrite for KnnVectorQuery (apache#12160)


- Reduce overhead of non-concurrent search by preserving original execution
- Improve readability by factoring into separate functions

---------

Co-authored-by: Kaival Parikh <kaivalp2000@gmail.com>

* adjusting for backport

---------

Co-authored-by: Kaival Parikh <46070017+kaivalnp@users.noreply.github.com>
Co-authored-by: Kaival Parikh <kaivalp2000@gmail.com>

* toposort use iterator to avoid stackoverflow (apache#12286)

Co-authored-by: tangdonghai <tangdonghai@meituan.com>
# Conflicts:
#	lucene/CHANGES.txt

* Fix test to compile with Java 11 after backport of apache#12286

* Update Javadoc for topoSortStates method after apache#12286 (apache#12292)

* Optimize HNSW diversity calculation (apache#12235)

* Minor cleanup and improvements to DaciukMihovAutomatonBuilder (apache#12305)

* GITHUB-12291: Skip blank lines from stopwords list. (apache#12299)

* Wrap Query rewrite backwards layer with AccessController (apache#12308)

* Make sure APIJAR reproduces with different timezone (unfortunately java encodes the date using local timezone) (apache#12315)

* Add multi-thread searchability to OnHeapHnswGraph (apache#12257)

* Fix backport error

* [MINOR] Update javadoc in Query class (apache#12233)

- add a few missing full stops
- update wording in the description of Query#equals method

* [Backport] Integrate the Incubating Panama Vector API apache#12311 (apache#12327)

Leverage accelerated vector hardware instructions in Vector Search.

Lucene already has a mechanism that enables the use of non-final JDK APIs, currently used for the Previewing Pamana Foreign API. This change expands this mechanism to include the Incubating Pamana Vector API. When the jdk.incubator.vector module is present at run time the Panamaized version of the low-level primitives used by Vector Search is enabled. If not present, the default scalar version of these low-level primitives is used (as it was previously).

Currently, we're only targeting support for JDK 20. A subsequent PR should evaluate JDK 21.
---------

Co-authored-by: Uwe Schindler <uschindler@apache.org>
Co-authored-by: Robert Muir <rmuir@apache.org>

* Parallelize knn query rewrite across slices rather than segments (apache#12325)

The concurrent query rewrite for knn vectory query introduced with apache#12160
requests one thread per segment to the executor. To align this with the
IndexSearcher parallel behaviour, we should rather parallelize across
slices. Also, we can reuse the same slice executor instance that the
index searcher already holds, in that way we are using a
QueueSizeBasedExecutor when a thread pool executor is provided.

* Optimize ConjunctionDISI.createConjunction (apache#12328)

This method is showing up as a little hot when profiling some queries.
Almost all the time spent in this method is just burnt on ceremony
around stream indirections that don't inline.
Moving this to iterators, simplifying the check for same doc id and also saving one iteration (for the min
cost) makes this method far cheaper and easier to read.

* Update changes to be correct with ARM (it is called NEON there)

* GH#12321: Marked DaciukMihovAutomatonBuilder as deprecated (apache#12332)

Preparing to reduce visibility of this class in a future release

* add BitSet.clear() (apache#12268)

# Conflicts:
#	lucene/CHANGES.txt

* Clenaup and update changes and synchronize with 9.x

* Update TestVectorUtilProviders.java (apache#12338)

* Don't generate stacktrace for TimeExceededException (apache#12335)

The exception is package private and never rethrown, we can avoid
generating a stacktrace for it.

* Introduced the Word2VecSynonymFilter (apache#12169)

Co-authored-by: Alessandro Benedetti <a.benedetti@sease.io>

* Word2VecSynonymFilter constructor null check (apache#12169)

* Use thread-safe search version of HnswGraphSearcher (apache#12246)

Addressing comment received in the PR apache#12246

* Word2VecSynonymProvider to use standard Integer max value for hnsw searches (apache#12235)
We observed this change was not ported previously from main in an old cherry-pick

* Fix searchafter high latency when after value is out of range for segment (apache#12334)

* Make memory fence in `ByteBufferGuard` explicit (apache#12290)

* Add "direct to binary" option for DaciukMihovAutomatonBuilder and use it in TermInSetQuery#visit (apache#12320)

* Add updateDocuments API which accept a query (reopen) (apache#12346)

* GITHUB#11350: Handle backward compatibility when merging segments with different FieldInfo

This commits restores Lucene 9's ability to handle indices created with Lucene 8 where there are discrepancies in FieldInfos, such as different IndexOptions

* [Tessellator] Improve the checks that validate the diagonal between two polygon nodes (apache#12353)

# Conflicts:
#	lucene/CHANGES.txt

* feat: soft delete optimize (apache#12339)

* Better paging when random reads go backwards (apache#12357)

When reading data from outside the buffer, BufferedIndexInput always resets
its buffer to start at the new read position. If we are reading backwards (for example,
using an OffHeapFSTStore for a terms dictionary) then this can have the effect of
re-reading the same data over and over again.

This commit changes BufferedIndexInput to use paging when reading backwards,
so that if we ask for a byte that is before the current buffer, we read a block of data
of bufferSize that ends at the previous buffer start.

Fixes apache#12356

* Work around SecurityManager issues during initialization of vector api (JDK-8309727) (apache#12362)

* Restrict GraphTokenStreamFiniteStrings#articulationPointsRecurse recursion depth (apache#12249)

* Implement MMapDirectory with Java 21 Project Panama Preview API (apache#12294)
Backport incl JDK21 apijar file with java.util.Objects regenerated

* remove relic in apijar folder caused by vector additions

* Speed up IndexedDISI Sparse #AdvanceExactWithinBlock for tiny step advance (apache#12324)

* Add checks in KNNVectorField / KNNVectorQuery to only allow non-null, non-empty and finite vectors (apache#12281)


---------

Co-authored-by: Uwe Schindler <uschindler@apache.org>

* Implement VectorUtilProvider with Java 21 Project Panama Vector API (apache#12363) (apache#12365)

This commit enables the Panama Vector API for Java 21. The version of
VectorUtilPanamaProvider for Java 21 is identical to that of Java 20.
As such, there is no specific 21 version - the Java 20 version will be
loaded from the MRJAR.

* Add CHANGES.txt for apache#12334 Honor after value for skipping documents even if queue is not full for PagingFieldCollector (apache#12368)

Signed-off-by: gashutos <gashutos@amazon.com>

* Move TermAndBoost back to its original location. (apache#12366)

PR apache#12169 accidentally moved the `TermAndBoost` class to a different location,
which would break custom sub-classes of `QueryBuilder`. This commit moves it
back to its original location.

* GITHUB-12252: Add function queries for computing similarity scores between knn vectors (apache#12253)

Co-authored-by: Alessandro Benedetti <a.benedetti@sease.io>

* hunspell (minor): reduce allocations when processing compound rules (apache#12316)

(cherry picked from commit a454388)

* hunspell (minor): reduce allocations when reading the dictionary's morphological data (apache#12323)

there can be many entries with morph data, so we'd better avoid compiling and matching regexes and even stream allocation

(cherry picked from commit 4bf1b94)

* TestHunspell: reduce the flakiness probability (apache#12351)

* TestHunspell: reduce the flakiness probability

We need to check how the timeout interacts with custom exception-throwing checkCanceled.
The default timeout seems not enough for some CI agents, so let's increase it.

Co-authored-by: Dawid Weiss <dawid.weiss@gmail.com>
(cherry picked from commit 5b63a18)

* This allows VectorUtilProvider tests to be executed although hardware may not fully support vectorization or if C2 is not enabled (apache#12376)

---------

Signed-off-by: gashutos <gashutos@amazon.com>
Co-authored-by: Alan Woodward <romseygeek@apache.org>
Co-authored-by: Luca Cavanna <javanna@apache.org>
Co-authored-by: Uwe Schindler <uschindler@apache.org>
Co-authored-by: Armin Braun <me@obrown.io>
Co-authored-by: Mikhail Khludnev <mkhludnev@users.noreply.github.com>
Co-authored-by: Marcus <marcuseagan@gmail.com>
Co-authored-by: Benjamin Trent <ben.w.trent@gmail.com>
Co-authored-by: Kaival Parikh <46070017+kaivalnp@users.noreply.github.com>
Co-authored-by: Kaival Parikh <kaivalp2000@gmail.com>
Co-authored-by: tang donghai <tangdhcs@gmail.com>
Co-authored-by: Patrick Zhai <zhaih@users.noreply.github.com>
Co-authored-by: Greg Miller <gsmiller@gmail.com>
Co-authored-by: Jerry Chin <metrxqin@gmail.com>
Co-authored-by: Patrick Zhai <zhai7631@gmail.com>
Co-authored-by: Andrey Bozhko <andybozhko@gmail.com>
Co-authored-by: Chris Hegarty <62058229+ChrisHegarty@users.noreply.github.com>
Co-authored-by: Robert Muir <rmuir@apache.org>
Co-authored-by: Jonathan Ellis <jbellis@datastax.com>
Co-authored-by: Daniele Antuzi <daniele.antuzi@gmail.com>
Co-authored-by: Alessandro Benedetti <a.benedetti@sease.io>
Co-authored-by: Chaitanya Gohel <104654647+gashutos@users.noreply.github.com>
Co-authored-by: Petr Portnov | PROgrm_JARvis <pportnov@ozon.ru>
Co-authored-by: Tomas Eduardo Fernandez Lobbe <tflobbe@apache.org>
Co-authored-by: Ignacio Vera <ivera@apache.org>
Co-authored-by: fudongying <30896830+fudongyingluck@users.noreply.github.com>
Co-authored-by: Chris Fournier <chris.fournier@shopify.com>
Co-authored-by: gf2121 <52390227+gf2121@users.noreply.github.com>
Co-authored-by: Adrien Grand <jpountz@gmail.com>
Co-authored-by: Elia Porciani <e.porciani@sease.io>
Co-authored-by: Peter Gromov <peter@jetbrains.com>
benwtrent pushed a commit that referenced this pull request Dec 11, 2023
### Description

Background in #12579

Add support for getting "all vectors within a radius" as opposed to getting the "topK closest vectors" in the current system

### Considerations

I've tried to keep this change minimal and non-invasive by not modifying any APIs and re-using existing HNSW graphs -- changing the graph traversal and result collection criteria to:
1. Visit all nodes (reachable from the entry node in the last level) that are within an outer "traversal" radius
2. Collect all nodes that are within an inner "result" radius

### Advantages

1. Queries that have a high number of "relevant" results will get all of those (not limited by `topK`)
2. Conversely, arbitrary queries where many results are not "relevant" will not waste time in getting all `topK` (when some of them will be removed later)
3. Results of HNSW searches need not be sorted - and we can store them in a plain list as opposed to min-max heaps (saving on `heapify` calls). Merging results from segments is also cheaper, where we just concatenate results as opposed to calculating the index-level `topK`

On a higher level, finding `topK` results needed HNSW searches to happen in `#rewrite` because of an interdependence of results between segments - where we want to find the index-level `topK` from multiple segment-level results. This is kind of against Lucene's concept of segments being independently searchable sub-indexes?

Moreover, we needed explicit concurrency (#12160) to perform these in parallel, and these shortcomings would be naturally overcome with the new objective of finding "all vectors within a radius" - inherently independent of results from another segment (so we can move searches to a more fitting place?)

### Caveats

I could not find much precedent in using HNSW graphs this way (or even the radius-based search for that matter - please add links to existing work if someone is aware) and consequently marked all classes as `@lucene.experimental`

For now I have re-used lots of functionality from `AbstractKnnVectorQuery` to keep this minimal, but if the use-case is accepted more widely we can look into writing more suitable queries (as mentioned above briefly)
benwtrent pushed a commit that referenced this pull request Dec 11, 2023
### Description

Background in #12579

Add support for getting "all vectors within a radius" as opposed to getting the "topK closest vectors" in the current system

### Considerations

I've tried to keep this change minimal and non-invasive by not modifying any APIs and re-using existing HNSW graphs -- changing the graph traversal and result collection criteria to:
1. Visit all nodes (reachable from the entry node in the last level) that are within an outer "traversal" radius
2. Collect all nodes that are within an inner "result" radius

### Advantages

1. Queries that have a high number of "relevant" results will get all of those (not limited by `topK`)
2. Conversely, arbitrary queries where many results are not "relevant" will not waste time in getting all `topK` (when some of them will be removed later)
3. Results of HNSW searches need not be sorted - and we can store them in a plain list as opposed to min-max heaps (saving on `heapify` calls). Merging results from segments is also cheaper, where we just concatenate results as opposed to calculating the index-level `topK`

On a higher level, finding `topK` results needed HNSW searches to happen in `#rewrite` because of an interdependence of results between segments - where we want to find the index-level `topK` from multiple segment-level results. This is kind of against Lucene's concept of segments being independently searchable sub-indexes?

Moreover, we needed explicit concurrency (#12160) to perform these in parallel, and these shortcomings would be naturally overcome with the new objective of finding "all vectors within a radius" - inherently independent of results from another segment (so we can move searches to a more fitting place?)

### Caveats

I could not find much precedent in using HNSW graphs this way (or even the radius-based search for that matter - please add links to existing work if someone is aware) and consequently marked all classes as `@lucene.experimental`

For now I have re-used lots of functionality from `AbstractKnnVectorQuery` to keep this minimal, but if the use-case is accepted more widely we can look into writing more suitable queries (as mentioned above briefly)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants