Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add early termination support for concurrent segment search #8306

Merged
merged 1 commit into from
Jul 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Allow mmap to use new JDK-19 preview APIs in Apache Lucene 9.4+ ([#5151](https://github.com/opensearch-project/OpenSearch/pull/5151))
- Add events correlation engine plugin ([#6854](https://github.com/opensearch-project/OpenSearch/issues/6854))
- Add support for ignoring missing Javadoc on generated code using annotation ([#7604](https://github.com/opensearch-project/OpenSearch/pull/7604))
- Add partial results support for concurrent segment search ([#8306](https://github.com/opensearch-project/OpenSearch/pull/8306))

### Dependencies
- Bump `log4j-core` from 2.18.0 to 2.19.0
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.search;

import org.opensearch.common.settings.FeatureFlagSettings;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;

public class ConcurrentSegmentSearchCancellationIT extends SearchCancellationIT {
jed326 marked this conversation as resolved.
Show resolved Hide resolved
@Override
protected Settings featureFlagSettings() {
Settings.Builder featureSettings = Settings.builder();
for (Setting builtInFlag : FeatureFlagSettings.BUILT_IN_FEATURE_FLAGS) {
featureSettings.put(builtInFlag.getKey(), builtInFlag.getDefaultRaw(Settings.EMPTY));
}
featureSettings.put(FeatureFlags.CONCURRENT_SEGMENT_SEARCH, "true");
return featureSettings.build();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.search;

import org.opensearch.common.settings.FeatureFlagSettings;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;

public class ConcurrentSegmentSearchTimeoutIT extends SearchTimeoutIT {

@Override
protected Settings featureFlagSettings() {
Settings.Builder featureSettings = Settings.builder();
for (Setting builtInFlag : FeatureFlagSettings.BUILT_IN_FEATURE_FLAGS) {
featureSettings.put(builtInFlag.getKey(), builtInFlag.getDefaultRaw(Settings.EMPTY));
}
featureSettings.put(FeatureFlags.CONCURRENT_SEGMENT_SEARCH, "true");
return featureSettings.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@
import static org.opensearch.action.support.WriteRequest.RefreshPolicy.IMMEDIATE;
import static org.opensearch.index.query.QueryBuilders.scriptQuery;
import static org.opensearch.search.SearchTimeoutIT.ScriptedTimeoutPlugin.SCRIPT_NAME;
import static org.hamcrest.Matchers.equalTo;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.SUITE)
public class SearchTimeoutIT extends OpenSearchIntegTestCase {
Expand All @@ -67,17 +66,37 @@ protected Settings nodeSettings(int nodeOrdinal) {
}

public void testSimpleTimeout() throws Exception {
for (int i = 0; i < 32; i++) {
final int numDocs = 1000;
for (int i = 0; i < numDocs; i++) {
client().prepareIndex("test").setId(Integer.toString(i)).setSource("field", "value").get();
}
refresh("test");

SearchResponse searchResponse = client().prepareSearch("test")
.setTimeout(new TimeValue(10, TimeUnit.MILLISECONDS))
.setTimeout(new TimeValue(5, TimeUnit.MILLISECONDS))
.setQuery(scriptQuery(new Script(ScriptType.INLINE, "mockscript", SCRIPT_NAME, Collections.emptyMap())))
.setAllowPartialSearchResults(true)
.get();
assertThat(searchResponse.isTimedOut(), equalTo(true));
assertTrue(searchResponse.isTimedOut());
assertEquals(0, searchResponse.getFailedShards());
assertTrue(numDocs > searchResponse.getHits().getTotalHits().value);
}

public void testSimpleDoesNotTimeout() throws Exception {
final int numDocs = 10;
for (int i = 0; i < numDocs; i++) {
client().prepareIndex("test").setId(Integer.toString(i)).setSource("field", "value").get();
}
refresh("test");

SearchResponse searchResponse = client().prepareSearch("test")
.setTimeout(new TimeValue(10000, TimeUnit.SECONDS))
.setQuery(scriptQuery(new Script(ScriptType.INLINE, "mockscript", SCRIPT_NAME, Collections.emptyMap())))
.setAllowPartialSearchResults(true)
.get();
assertFalse(searchResponse.isTimedOut());
assertEquals(0, searchResponse.getFailedShards());
assertEquals(numDocs, searchResponse.getHits().getTotalHits().value);
}

public void testPartialResultsIntolerantTimeout() throws Exception {
Expand All @@ -91,7 +110,7 @@ public void testPartialResultsIntolerantTimeout() throws Exception {
.setAllowPartialSearchResults(false) // this line causes timeouts to report failures
.get()
);
assertTrue(ex.toString().contains("Time exceeded"));
assertTrue(ex.toString().contains("QueryPhaseExecutionException[Time exceeded]"));
jed326 marked this conversation as resolved.
Show resolved Hide resolved
}

public static class ScriptedTimeoutPlugin extends MockScriptPlugin {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@
import org.opensearch.search.profile.query.ProfileWeight;
import org.opensearch.search.profile.query.QueryProfiler;
import org.opensearch.search.profile.query.QueryTimingType;
import org.opensearch.search.query.QueryPhase;
import org.opensearch.search.query.QuerySearchResult;
import org.opensearch.search.sort.FieldSortBuilder;
import org.opensearch.search.sort.MinAndMax;
Expand Down Expand Up @@ -103,26 +104,6 @@ public class ContextIndexSearcher extends IndexSearcher implements Releasable {
private MutableQueryTimeout cancellable;
private SearchContext searchContext;

public ContextIndexSearcher(
IndexReader reader,
Similarity similarity,
QueryCache queryCache,
QueryCachingPolicy queryCachingPolicy,
boolean wrapWithExitableDirectoryReader,
Executor executor
) throws IOException {
this(
reader,
similarity,
queryCache,
queryCachingPolicy,
new MutableQueryTimeout(),
wrapWithExitableDirectoryReader,
executor,
null
);
}

jed326 marked this conversation as resolved.
Show resolved Hide resolved
public ContextIndexSearcher(
IndexReader reader,
Similarity similarity,
Expand Down Expand Up @@ -310,18 +291,22 @@ private void searchLeaf(LeafReaderContext ctx, Weight weight, Collector collecto
return;
}

cancellable.checkCancelled();
weight = wrapWeight(weight);
// See please https://github.com/apache/lucene/pull/964
collector.setWeight(weight);
final LeafCollector leafCollector;
try {
cancellable.checkCancelled();
weight = wrapWeight(weight);
// See please https://github.com/apache/lucene/pull/964
collector.setWeight(weight);
leafCollector = collector.getLeafCollector(ctx);
} catch (CollectionTerminatedException e) {
// there is no doc of interest in this reader context
// continue with the following leaf
return;
} catch (QueryPhase.TimeExceededException e) {
searchContext.setSearchTimedOut(true);
return;
}
// catch early terminated exception and rethrow?
Bits liveDocs = ctx.reader().getLiveDocs();
BitSet liveDocsBitSet = getSparseBitSetOrNull(liveDocs);
if (liveDocsBitSet == null) {
Expand All @@ -332,6 +317,9 @@ private void searchLeaf(LeafReaderContext ctx, Weight weight, Collector collecto
} catch (CollectionTerminatedException e) {
// collection was terminated prematurely
// continue with the following leaf
} catch (QueryPhase.TimeExceededException e) {
searchContext.setSearchTimedOut(true);
jed326 marked this conversation as resolved.
Show resolved Hide resolved
reta marked this conversation as resolved.
Show resolved Hide resolved
return;
}
}
} else {
Expand All @@ -348,6 +336,9 @@ private void searchLeaf(LeafReaderContext ctx, Weight weight, Collector collecto
} catch (CollectionTerminatedException e) {
// collection was terminated prematurely
// continue with the following leaf
} catch (QueryPhase.TimeExceededException e) {
searchContext.setSearchTimedOut(true);
return;
}
}
}
Expand Down Expand Up @@ -492,7 +483,7 @@ private boolean canMatch(LeafReaderContext ctx) throws IOException {
}

private boolean canMatchSearchAfter(LeafReaderContext ctx) throws IOException {
if (searchContext != null && searchContext.request() != null && searchContext.request().source() != null) {
if (searchContext.request() != null && searchContext.request().source() != null) {
// Only applied on primary sort field and primary search_after.
FieldSortBuilder primarySortField = FieldSortBuilder.getPrimaryFieldSortOrNull(searchContext.request().source());
if (primarySortField != null) {
Expand All @@ -512,7 +503,7 @@ private boolean shouldReverseLeafReaderContexts() {
// This is actually beneficial for search queries to start search on latest segments first for time series workload.
// That can slow down ASC order queries on timestamp workload. So to avoid that slowdown, we will reverse leaf
// reader order here.
if (searchContext != null && searchContext.indexShard().isTimeSeriesDescSortOptimizationEnabled()) {
if (searchContext.indexShard().isTimeSeriesDescSortOptimizationEnabled()) {
// Only reverse order for asc order sort queries
if (searchContext.sort() != null
&& searchContext.sort().sort != null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,8 @@ public abstract class SearchContext implements Releasable {
private final AtomicBoolean closed = new AtomicBoolean(false);
private InnerHitsContext innerHitsContext;

private volatile boolean searchTimedOut;

protected SearchContext() {}

public abstract void setTask(SearchShardTask task);
Expand All @@ -106,6 +108,14 @@ protected SearchContext() {}

public abstract boolean isCancelled();

public boolean isSearchTimedOut() {
return this.searchTimedOut;
}

public void setSearchTimedOut(boolean searchTimedOut) {
this.searchTimedOut = searchTimedOut;
}

@Override
public final void close() {
if (closed.compareAndSet(false, true)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@
import org.opensearch.search.internal.SearchContext;
import org.opensearch.search.profile.query.ProfileCollectorManager;
import org.opensearch.search.query.QueryPhase.DefaultQueryPhaseSearcher;
import org.opensearch.search.query.QueryPhase.TimeExceededException;

import java.io.IOException;
import java.util.LinkedList;
import java.util.concurrent.ExecutionException;

import static org.opensearch.search.query.TopDocsCollectorContext.createTopDocsCollectorContext;

Expand Down Expand Up @@ -80,12 +80,12 @@ private static boolean searchWithCollectorManager(
try {
final ReduceableSearchResult result = searcher.search(query, collectorManager);
result.reduce(queryResult);
} catch (EarlyTerminatingCollector.EarlyTerminationException e) {
queryResult.terminatedEarly(true);
jed326 marked this conversation as resolved.
Show resolved Hide resolved
} catch (TimeExceededException e) {
} catch (RuntimeException re) {
jed326 marked this conversation as resolved.
Show resolved Hide resolved
rethrowCauseIfPossible(re, searchContext);
}
if (searchContext.isSearchTimedOut()) {
assert timeoutSet : "TimeExceededException thrown even though timeout wasn't set";
if (searchContext.request().allowPartialSearchResults() == false) {
// Can't rethrow TimeExceededException because not serializable
throw new QueryPhaseExecutionException(searchContext.shardTarget(), "Time exceeded");
}
queryResult.searchTimedOut(true);
Expand All @@ -101,4 +101,26 @@ private static boolean searchWithCollectorManager(
public AggregationProcessor aggregationProcessor(SearchContext searchContext) {
return aggregationProcessor;
}

private static <T extends Exception> void rethrowCauseIfPossible(RuntimeException re, SearchContext searchContext) throws T {
// Rethrow exception if cause is null
if (re.getCause() == null) {
throw re;
}

// Unwrap the RuntimeException and ExecutionException from Lucene concurrent search method and rethrow
if (re.getCause() instanceof ExecutionException || re.getCause() instanceof InterruptedException) {
Throwable t = re.getCause();
if (t.getCause() != null) {
throw (T) t.getCause();
}
}

// Rethrow any unexpected exception types
throw new QueryPhaseExecutionException(
searchContext.shardTarget(),
"Failed to execute concurrent segment search thread",
re.getCause()
);
}
reta marked this conversation as resolved.
Show resolved Hide resolved
}
Original file line number Diff line number Diff line change
Expand Up @@ -354,10 +354,10 @@ private static boolean searchWithCollector(
searcher.search(query, queryCollector);
} catch (EarlyTerminatingCollector.EarlyTerminationException e) {
queryResult.terminatedEarly(true);
} catch (TimeExceededException e) {
}
if (searchContext.isSearchTimedOut()) {
assert timeoutSet : "TimeExceededException thrown even though timeout wasn't set";
if (searchContext.request().allowPartialSearchResults() == false) {
// Can't rethrow TimeExceededException because not serializable
throw new QueryPhaseExecutionException(searchContext.shardTarget(), "Time exceeded");
}
queryResult.searchTimedOut(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,9 @@
import org.apache.lucene.util.automaton.CompiledAutomaton;
import org.apache.lucene.util.automaton.RegExp;
import org.opensearch.common.util.io.IOUtils;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.search.internal.ContextIndexSearcher;
import org.opensearch.search.internal.SearchContext;
import org.opensearch.tasks.TaskCancelledException;
import org.opensearch.test.OpenSearchTestCase;
import org.junit.AfterClass;
Expand All @@ -59,6 +61,8 @@
import java.util.concurrent.atomic.AtomicBoolean;

import static org.hamcrest.Matchers.equalTo;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

public class SearchCancellationTests extends OpenSearchTestCase {

Expand Down Expand Up @@ -109,7 +113,8 @@ public void testAddingCancellationActions() throws IOException {
IndexSearcher.getDefaultQueryCache(),
IndexSearcher.getDefaultQueryCachingPolicy(),
true,
null
null,
mock(SearchContext.class)
);
NullPointerException npe = expectThrows(NullPointerException.class, () -> searcher.addQueryCancellation(null));
assertEquals("cancellation runnable should not be null", npe.getMessage());
Expand All @@ -123,13 +128,17 @@ public void testAddingCancellationActions() throws IOException {
public void testCancellableCollector() throws IOException {
TotalHitCountCollector collector1 = new TotalHitCountCollector();
Runnable cancellation = () -> { throw new TaskCancelledException("cancelled"); };
SearchContext searchContext = mock(SearchContext.class);
IndexShard indexShard = mock(IndexShard.class);
when(searchContext.indexShard()).thenReturn(indexShard);
ContextIndexSearcher searcher = new ContextIndexSearcher(
reader,
IndexSearcher.getDefaultSimilarity(),
IndexSearcher.getDefaultQueryCache(),
IndexSearcher.getDefaultQueryCachingPolicy(),
true,
null
null,
searchContext
);

searcher.search(new MatchAllDocsQuery(), collector1);
Expand Down Expand Up @@ -157,7 +166,8 @@ public void testExitableDirectoryReader() throws IOException {
IndexSearcher.getDefaultQueryCache(),
IndexSearcher.getDefaultQueryCachingPolicy(),
true,
null
null,
mock(SearchContext.class)
);
searcher.addQueryCancellation(cancellation);
CompiledAutomaton automaton = new CompiledAutomaton(new RegExp("a.*").toAutomaton());
Expand Down
Loading