Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce executor for concurrent search #98204

Merged
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
20e6dbe
Introduce executor for concurrent search
javanna Aug 4, 2023
01733dc
Update docs/changelog/98204.yaml
javanna Aug 4, 2023
9229f4d
forbidden api
javanna Aug 4, 2023
f67551f
grrrrrrrrrr
javanna Aug 4, 2023
db6e906
leftover
javanna Aug 4, 2023
3b86ab1
forbidden api
javanna Aug 4, 2023
438d76d
spotless
javanna Aug 4, 2023
ad260e0
Update server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java
javanna Aug 4, 2023
5a8ec56
iter
javanna Aug 4, 2023
9069bfe
iter
javanna Aug 7, 2023
026258c
iter
javanna Aug 7, 2023
fed2aaf
Merge branch 'main' into enhancement/bounded_executor_concurrent_search
javanna Aug 7, 2023
6688e92
iter
javanna Aug 7, 2023
cfcc819
iter
javanna Aug 7, 2023
d2a982b
special case for no segments
javanna Aug 7, 2023
0b2b243
spotless
javanna Aug 8, 2023
103363f
terminate thread pool in test
javanna Aug 8, 2023
b44a239
Merge branch 'main' into enhancement/bounded_executor_concurrent_search
javanna Aug 8, 2023
4220d6a
fix thread pool executor cast
javanna Aug 8, 2023
89faa2b
iter
javanna Aug 8, 2023
9ec1fd5
Merge branch 'main' into enhancement/bounded_executor_concurrent_search
javanna Aug 8, 2023
eced2c8
iter
javanna Aug 8, 2023
2844a05
iter
javanna Aug 8, 2023
5959f83
remove TODO and clarify comments
javanna Aug 8, 2023
ee55047
clarify docs
javanna Aug 8, 2023
8c77ed6
spotless
javanna Aug 8, 2023
af76ab6
iter
javanna Aug 8, 2023
262996d
Remove supportsOffloadingSequentialCollection() method and do aggrega…
martijnvg Aug 9, 2023
bb4ffca
Also postCollect when there are no slices
martijnvg Aug 9, 2023
c3d8034
address tests
martijnvg Aug 9, 2023
151b93b
spotless
martijnvg Aug 9, 2023
23121e3
Correctly overwrite supportsParallelCollection(...)
martijnvg Aug 10, 2023
bca4050
fixed unit tests
martijnvg Aug 10, 2023
df0ff91
Also perform postCollection when executing search in sort order.
martijnvg Aug 10, 2023
9b92a0b
Comment
javanna Aug 10, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/98204.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 98204
summary: Introduce executor for concurrent search
area: Search
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -502,7 +502,7 @@ public void apply(Settings value, Settings current, Settings previous) {
ResourceWatcherService.RELOAD_INTERVAL_LOW,
SearchModule.INDICES_MAX_CLAUSE_COUNT_SETTING,
SearchModule.INDICES_MAX_NESTED_DEPTH_SETTING,
SearchModule.SEARCH_CONCURRENCY_ENABLED,
SearchService.SEARCH_WORKER_THREADS_ENABLED,
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have renamed the escape hatch to disable concurrency to align it with the name of the new thread pool, as it effectively affects whether the new thread pool is enabled or not.

ThreadPool.ESTIMATED_TIME_INTERVAL_SETTING,
ThreadPool.LATE_TIME_INTERVAL_WARN_THRESHOLD_SETTING,
ThreadPool.SLOW_SCHEDULER_TASK_WARN_THRESHOLD_SETTING,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.concurrent.LinkedTransferQueue;
import java.util.concurrent.RunnableFuture;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -127,11 +128,24 @@ public static EsThreadPoolExecutor newFixed(
TaskTrackingConfig config
) {
BlockingQueue<Runnable> queue;
if (queueCapacity < 0) {
if (queueCapacity == -128) {
queue = new SynchronousQueue<>(true);
} else if (queueCapacity < 0) {
queue = ConcurrentCollections.newBlockingQueue();
} else {
queue = new SizeBlockingQueue<>(ConcurrentCollections.<Runnable>newBlockingQueue(), queueCapacity);
}
return newFixed(name, size, queue, threadFactory, contextHolder, config);
}

private static EsThreadPoolExecutor newFixed(
String name,
int size,
BlockingQueue<Runnable> queue,
ThreadFactory threadFactory,
ThreadContext contextHolder,
TaskTrackingConfig config
) {
if (config.trackExecutionTime()) {
return new TaskExecutionTimeTrackingEsThreadPoolExecutor(
name,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.function.LongSupplier;

final class DefaultSearchContext extends SearchContext {
Expand Down Expand Up @@ -136,10 +138,11 @@ final class DefaultSearchContext extends SearchContext {
SearchShardTarget shardTarget,
LongSupplier relativeTimeSupplier,
TimeValue timeout,
int minimumDocsPerSlice,
FetchPhase fetchPhase,
boolean lowLevelCancellation,
boolean parallelize
Executor executor,
boolean forceSequentialCollection,
int minimumDocsPerSlice
) throws IOException {
this.readerContext = readerContext;
this.request = request;
Expand All @@ -150,19 +153,28 @@ final class DefaultSearchContext extends SearchContext {
this.indexShard = readerContext.indexShard();

Engine.Searcher engineSearcher = readerContext.acquireSearcher("search");
this.searcher = new ContextIndexSearcher(
engineSearcher.getIndexReader(),
engineSearcher.getSimilarity(),
engineSearcher.getQueryCache(),
engineSearcher.getQueryCachingPolicy(),
minimumDocsPerSlice,
lowLevelCancellation,
// TODO not set the for now, this needs a special thread pool and can be enabled after its introduction
// parallelize
// ? (EsThreadPoolExecutor) this.indexService.getThreadPool().executor(ThreadPool.Names.CONCURRENT_COLLECTION_TBD)
// : null,
null
);
if (executor == null) {
this.searcher = new ContextIndexSearcher(
engineSearcher.getIndexReader(),
engineSearcher.getSimilarity(),
engineSearcher.getQueryCache(),
engineSearcher.getQueryCachingPolicy(),
lowLevelCancellation
);
} else {
this.searcher = new ContextIndexSearcher(
engineSearcher.getIndexReader(),
engineSearcher.getSimilarity(),
engineSearcher.getQueryCache(),
engineSearcher.getQueryCachingPolicy(),
lowLevelCancellation,
executor,
forceSequentialCollection,
minimumDocsPerSlice,
// TODO is there a better way that does not require casting the executor?
((ThreadPoolExecutor) executor).getMaximumPoolSize()
);
}
releasables.addAll(List.of(engineSearcher, searcher));

this.relativeTimeSupplier = relativeTimeSupplier;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -281,13 +281,6 @@ public class SearchModule {
Setting.Property.NodeScope
);

public static final Setting<Boolean> SEARCH_CONCURRENCY_ENABLED = Setting.boolSetting(
"search.concurrency_enabled",
true,
Setting.Property.NodeScope,
Setting.Property.Dynamic
);

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I moved this to SearchService together with the other existing search settings (including minimum docs per slice which affects search concurrency)

private final Map<String, Highlighter> highlighters;

private final List<FetchSubPhase> fetchSubPhases = new ArrayList<>();
Expand Down
33 changes: 19 additions & 14 deletions server/src/main/java/org/elasticsearch/search/SearchService.java
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
Expand All @@ -141,7 +142,6 @@
import static org.elasticsearch.core.TimeValue.timeValueMillis;
import static org.elasticsearch.core.TimeValue.timeValueMinutes;
import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO;
import static org.elasticsearch.search.SearchModule.SEARCH_CONCURRENCY_ENABLED;

public class SearchService extends AbstractLifecycleComponent implements IndexEventListener {
private static final Logger logger = LogManager.getLogger(SearchService.class);
Expand Down Expand Up @@ -211,6 +211,13 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
Property.NodeScope
);

public static final Setting<Boolean> SEARCH_WORKER_THREADS_ENABLED = Setting.boolSetting(
"search.worker_threads_enabled",
true,
Property.NodeScope,
Property.Dynamic
);

public static final Setting<Integer> MAX_OPEN_SCROLL_CONTEXT = Setting.intSetting(
"search.max_open_scroll_context",
500,
Expand Down Expand Up @@ -253,7 +260,7 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
private final DfsPhase dfsPhase = new DfsPhase();

private final FetchPhase fetchPhase;
private volatile boolean enableConcurrentCollection;
private volatile boolean enableSearchWorkerThreads;

private volatile long defaultKeepAlive;

Expand Down Expand Up @@ -344,16 +351,12 @@ public SearchService(
clusterService.getClusterSettings()
.addSettingsUpdateConsumer(ENABLE_REWRITE_AGGS_TO_FILTER_BY_FILTER, this::setEnableRewriteAggsToFilterByFilter);

enableConcurrentCollection = SEARCH_CONCURRENCY_ENABLED.get(settings);
clusterService.getClusterSettings().addSettingsUpdateConsumer(SEARCH_CONCURRENCY_ENABLED, this::setEnableConcurrentCollection);
}

private void setEnableConcurrentCollection(boolean concurrentCollection) {
this.enableConcurrentCollection = concurrentCollection;
enableSearchWorkerThreads = SEARCH_WORKER_THREADS_ENABLED.get(settings);
clusterService.getClusterSettings().addSettingsUpdateConsumer(SEARCH_WORKER_THREADS_ENABLED, this::setEnableSearchWorkerThreads);
}

boolean isConcurrentCollectionEnabled() {
return this.enableConcurrentCollection;
private void setEnableSearchWorkerThreads(boolean enableSearchWorkerThreads) {
this.enableSearchWorkerThreads = enableSearchWorkerThreads;
}

private static void validateKeepAlives(TimeValue defaultKeepAlive, TimeValue maxKeepAlive) {
Expand Down Expand Up @@ -1039,7 +1042,7 @@ public DefaultSearchContext createSearchContext(ShardSearchRequest request, Time
final Engine.SearcherSupplier reader = indexShard.acquireSearcherSupplier();
final ShardSearchContextId id = new ShardSearchContextId(sessionId, idGenerator.incrementAndGet());
try (ReaderContext readerContext = new ReaderContext(id, indexService, indexShard, reader, -1L, true)) {
DefaultSearchContext searchContext = createSearchContext(readerContext, request, timeout, null);
DefaultSearchContext searchContext = createSearchContext(readerContext, request, timeout, ResultsType.NONE);
searchContext.addReleasable(readerContext.markAsUsed(0L));
return searchContext;
}
Expand All @@ -1060,16 +1063,18 @@ private DefaultSearchContext createSearchContext(
reader.indexShard().shardId(),
request.getClusterAlias()
);
ExecutorService executor = threadPool.executor(Names.SEARCH_WORKER);
searchContext = new DefaultSearchContext(
reader,
request,
shardTarget,
threadPool::relativeTimeInMillis,
timeout,
minimumDocsPerSlice,
fetchPhase,
lowLevelCancellation,
this.enableConcurrentCollection && concurrentSearchEnabled(resultsType, request.source())
this.enableSearchWorkerThreads ? executor : null,
supportsConcurrency(resultsType, request.source()) == false,
minimumDocsPerSlice
);
// we clone the query shard context here just for rewriting otherwise we
// might end up with incorrect state since we are using now() or script services
Expand All @@ -1089,7 +1094,7 @@ this.enableConcurrentCollection && concurrentSearchEnabled(resultsType, request.
return searchContext;
}

static boolean concurrentSearchEnabled(ResultsType resultsType, SearchSourceBuilder source) {
static boolean supportsConcurrency(ResultsType resultsType, SearchSourceBuilder source) {
if (resultsType == ResultsType.DFS) {
return true; // only enable concurrent collection for DFS phase for now
}
Expand Down
Loading