From 507f56f64b40d4950a491adb624d74018731124f Mon Sep 17 00:00:00 2001 From: Jay Deng Date: Mon, 12 Jun 2023 13:00:51 -0700 Subject: [PATCH] Add dynamic index and cluster setting for concurrent segment search (#7956) * Add dynamic index and cluster setting for concurrent segment search Signed-off-by: Jay Deng * Use feature flagged settings map Signed-off-by: Jay Deng --------- Signed-off-by: Jay Deng Signed-off-by: Rishab Nahata --- CHANGELOG.md | 1 + distribution/src/config/opensearch.yml | 7 ++ .../common/settings/ClusterSettings.java | 4 +- .../common/settings/IndexScopedSettings.java | 4 +- .../org/opensearch/index/IndexSettings.java | 15 ++- .../search/DefaultSearchContext.java | 22 ++++ .../org/opensearch/search/SearchModule.java | 15 +-- .../org/opensearch/search/SearchService.java | 7 ++ .../search/internal/SearchContext.java | 7 ++ .../query/ConcurrentQueryPhaseSearcher.java | 14 +-- .../query/QueryPhaseSearcherWrapper.java | 82 ++++++++++++++ .../common/settings/SettingsModuleTests.java | 53 +++++++++ .../opensearch/search/SearchModuleTests.java | 6 +- .../opensearch/search/SearchServiceTests.java | 106 +++++++++++++++++- .../aggregations/AggregationSetupTests.java | 2 + .../search/query/QueryPhaseTests.java | 3 + .../opensearch/test/TestSearchContext.java | 16 +++ 17 files changed, 334 insertions(+), 30 deletions(-) create mode 100644 server/src/main/java/org/opensearch/search/query/QueryPhaseSearcherWrapper.java diff --git a/CHANGELOG.md b/CHANGELOG.md index 3ad603eefaeb9..bf33d6d208b19 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Add events correlation engine plugin ([#6854](https://github.com/opensearch-project/OpenSearch/issues/6854)) - Add support for ignoring missing Javadoc on generated code using annotation ([#7604](https://github.com/opensearch-project/OpenSearch/pull/7604)) - Implement concurrent aggregations support without profile option ([#7514](https://github.com/opensearch-project/OpenSearch/pull/7514)) +- Add dynamic index and cluster setting for concurrent segment search ([#7956](https://github.com/opensearch-project/OpenSearch/pull/7956)) ### Dependencies - Bump `log4j-core` from 2.18.0 to 2.19.0 diff --git a/distribution/src/config/opensearch.yml b/distribution/src/config/opensearch.yml index 107fe345c942b..8c4160db98857 100644 --- a/distribution/src/config/opensearch.yml +++ b/distribution/src/config/opensearch.yml @@ -130,4 +130,11 @@ ${path.logs} # # Gates the search pipeline feature. This feature enables configurable processors # for search requests and search responses, similar to ingest pipelines. +# #opensearch.experimental.feature.search_pipeline.enabled: false +# +# +# Gates the concurrent segment search feature. This feature enables concurrent segment search in a separate +# index searcher threadpool. +# +#opensearch.experimental.feature.concurrent_segment_search.enabled: false diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index 6dfa705b12896..fe1d292dbd8f6 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -673,6 +673,8 @@ public void apply(Settings value, Settings current, Settings previous) { IndicesService.CLUSTER_REMOTE_STORE_REPOSITORY_SETTING, IndicesService.CLUSTER_REMOTE_TRANSLOG_STORE_ENABLED_SETTING, IndicesService.CLUSTER_REMOTE_TRANSLOG_REPOSITORY_SETTING - ) + ), + List.of(FeatureFlags.CONCURRENT_SEGMENT_SEARCH), + List.of(SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING) ); } diff --git a/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java b/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java index 734830f99e6fb..95c0f3b55222f 100644 --- a/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java @@ -235,7 +235,9 @@ public final class IndexScopedSettings extends AbstractScopedSettings { IndexMetadata.INDEX_REMOTE_TRANSLOG_STORE_ENABLED_SETTING, IndexMetadata.INDEX_REMOTE_TRANSLOG_REPOSITORY_SETTING, IndexMetadata.INDEX_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING - ) + ), + FeatureFlags.CONCURRENT_SEGMENT_SEARCH, + List.of(IndexSettings.INDEX_CONCURRENT_SEGMENT_SEARCH_SETTING) ); public static final IndexScopedSettings DEFAULT_SCOPED_SETTINGS = new IndexScopedSettings(Settings.EMPTY, BUILT_IN_INDEX_SETTINGS); diff --git a/server/src/main/java/org/opensearch/index/IndexSettings.java b/server/src/main/java/org/opensearch/index/IndexSettings.java index de7dc102939ce..9c6613495ba80 100644 --- a/server/src/main/java/org/opensearch/index/IndexSettings.java +++ b/server/src/main/java/org/opensearch/index/IndexSettings.java @@ -588,6 +588,13 @@ public final class IndexSettings { Property.IndexScope ); + public static final Setting INDEX_CONCURRENT_SEGMENT_SEARCH_SETTING = Setting.boolSetting( + "index.search.concurrent_segment_search.enabled", + false, + Property.IndexScope, + Property.Dynamic + ); + private final Index index; private final Version version; private final Logger logger; @@ -1602,7 +1609,13 @@ public void setDefaultSearchPipeline(String defaultSearchPipeline) { if (FeatureFlags.isEnabled(SEARCH_PIPELINE)) { this.defaultSearchPipeline = defaultSearchPipeline; } else { - throw new SettingsException("Unsupported setting: " + DEFAULT_SEARCH_PIPELINE.getKey()); + throw new SettingsException( + "Unable to update setting: " + + DEFAULT_SEARCH_PIPELINE.getKey() + + ". This is an experimental feature that is currently disabled, please enable the " + + SEARCH_PIPELINE + + " feature flag first." + ); } } } diff --git a/server/src/main/java/org/opensearch/search/DefaultSearchContext.java b/server/src/main/java/org/opensearch/search/DefaultSearchContext.java index ae369fd87345e..c50d0280aec2d 100644 --- a/server/src/main/java/org/opensearch/search/DefaultSearchContext.java +++ b/server/src/main/java/org/opensearch/search/DefaultSearchContext.java @@ -48,6 +48,7 @@ import org.opensearch.common.lucene.search.Queries; import org.opensearch.common.unit.TimeValue; import org.opensearch.common.util.BigArrays; +import org.opensearch.common.util.FeatureFlags; import org.opensearch.core.common.lease.Releasables; import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException; import org.opensearch.index.IndexService; @@ -104,6 +105,8 @@ import java.util.function.Function; import java.util.function.LongSupplier; +import static org.opensearch.search.SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING; + /** * The main search context used during search phase * @@ -869,6 +872,25 @@ public Profilers getProfilers() { return profilers; } + /** + * Returns concurrent segment search status for the search context + */ + @Override + public boolean isConcurrentSegmentSearchEnabled() { + if (FeatureFlags.isEnabled(FeatureFlags.CONCURRENT_SEGMENT_SEARCH) + && (clusterService != null) + && (searcher().getExecutor() != null)) { + return indexService.getIndexSettings() + .getSettings() + .getAsBoolean( + IndexSettings.INDEX_CONCURRENT_SEGMENT_SEARCH_SETTING.getKey(), + clusterService.getClusterSettings().get(CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING) + ); + } else { + return false; + } + } + public void setProfilers(Profilers profilers) { this.profilers = profilers; } diff --git a/server/src/main/java/org/opensearch/search/SearchModule.java b/server/src/main/java/org/opensearch/search/SearchModule.java index a4aa1cbf0d3c2..aeb1d8325b1b8 100644 --- a/server/src/main/java/org/opensearch/search/SearchModule.java +++ b/server/src/main/java/org/opensearch/search/SearchModule.java @@ -257,9 +257,9 @@ import org.opensearch.search.fetch.subphase.highlight.Highlighter; import org.opensearch.search.fetch.subphase.highlight.PlainHighlighter; import org.opensearch.search.fetch.subphase.highlight.UnifiedHighlighter; -import org.opensearch.search.query.ConcurrentQueryPhaseSearcher; import org.opensearch.search.query.QueryPhase; import org.opensearch.search.query.QueryPhaseSearcher; +import org.opensearch.search.query.QueryPhaseSearcherWrapper; import org.opensearch.search.rescore.QueryRescorerBuilder; import org.opensearch.search.rescore.RescorerBuilder; import org.opensearch.search.sort.FieldSortBuilder; @@ -1258,8 +1258,8 @@ private QueryPhaseSearcher registerQueryPhaseSearcher(List plugins } } - if (searcher == null && FeatureFlags.isEnabled(FeatureFlags.CONCURRENT_SEGMENT_SEARCH)) { - searcher = new ConcurrentQueryPhaseSearcher(); + if (searcher == null) { + searcher = new QueryPhaseSearcherWrapper(); } return searcher; } @@ -1290,14 +1290,7 @@ public FetchPhase getFetchPhase() { } public QueryPhase getQueryPhase() { - QueryPhase queryPhase; - if (queryPhaseSearcher == null) { - // use the defaults - queryPhase = new QueryPhase(); - } else { - queryPhase = new QueryPhase(queryPhaseSearcher); - } - return queryPhase; + return new QueryPhase(queryPhaseSearcher); } public @Nullable ExecutorService getIndexSearcherExecutor(ThreadPool pool) { diff --git a/server/src/main/java/org/opensearch/search/SearchService.java b/server/src/main/java/org/opensearch/search/SearchService.java index d72759c506561..23d35cb823342 100644 --- a/server/src/main/java/org/opensearch/search/SearchService.java +++ b/server/src/main/java/org/opensearch/search/SearchService.java @@ -247,6 +247,13 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv Property.NodeScope ); + public static final Setting CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING = Setting.boolSetting( + "search.concurrent_segment_search.enabled", + true, + Property.Dynamic, + Property.NodeScope + ); + public static final int DEFAULT_SIZE = 10; public static final int DEFAULT_FROM = 0; diff --git a/server/src/main/java/org/opensearch/search/internal/SearchContext.java b/server/src/main/java/org/opensearch/search/internal/SearchContext.java index 94ae490ef7d56..79a5b89cfa881 100644 --- a/server/src/main/java/org/opensearch/search/internal/SearchContext.java +++ b/server/src/main/java/org/opensearch/search/internal/SearchContext.java @@ -366,6 +366,13 @@ public final void assignRescoreDocIds(RescoreDocIds rescoreDocIds) { */ public abstract Profilers getProfilers(); + /** + * Returns concurrent segment search status for the search context + */ + public boolean isConcurrentSegmentSearchEnabled() { + return false; + } + /** * Adds a releasable that will be freed when this context is closed. */ diff --git a/server/src/main/java/org/opensearch/search/query/ConcurrentQueryPhaseSearcher.java b/server/src/main/java/org/opensearch/search/query/ConcurrentQueryPhaseSearcher.java index 844e84c081b1c..e3ba0eda4af55 100644 --- a/server/src/main/java/org/opensearch/search/query/ConcurrentQueryPhaseSearcher.java +++ b/server/src/main/java/org/opensearch/search/query/ConcurrentQueryPhaseSearcher.java @@ -48,14 +48,7 @@ protected boolean searchWithCollector( boolean hasFilterCollector, boolean hasTimeout ) throws IOException { - boolean couldUseConcurrentSegmentSearch = allowConcurrentSegmentSearch(searcher); - - if (couldUseConcurrentSegmentSearch) { - LOGGER.debug("Using concurrent search over index segments (experimental)"); - return searchWithCollectorManager(searchContext, searcher, query, collectors, hasFilterCollector, hasTimeout); - } else { - return super.searchWithCollector(searchContext, searcher, query, collectors, hasFilterCollector, hasTimeout); - } + return searchWithCollectorManager(searchContext, searcher, query, collectors, hasFilterCollector, hasTimeout); } private static boolean searchWithCollectorManager( @@ -108,9 +101,4 @@ private static boolean searchWithCollectorManager( public AggregationProcessor aggregationProcessor(SearchContext searchContext) { return aggregationProcessor; } - - private static boolean allowConcurrentSegmentSearch(final ContextIndexSearcher searcher) { - return (searcher.getExecutor() != null); - } - } diff --git a/server/src/main/java/org/opensearch/search/query/QueryPhaseSearcherWrapper.java b/server/src/main/java/org/opensearch/search/query/QueryPhaseSearcherWrapper.java new file mode 100644 index 0000000000000..407603f00461e --- /dev/null +++ b/server/src/main/java/org/opensearch/search/query/QueryPhaseSearcherWrapper.java @@ -0,0 +1,82 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.search.query; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.lucene.search.Query; +import org.opensearch.common.util.FeatureFlags; +import org.opensearch.search.aggregations.AggregationProcessor; +import org.opensearch.search.internal.ContextIndexSearcher; +import org.opensearch.search.internal.SearchContext; +import org.apache.lucene.search.CollectorManager; + +import java.io.IOException; +import java.util.LinkedList; + +/** + * Wrapper class for QueryPhaseSearcher that handles path selection for concurrent vs + * non-concurrent search query phase and aggregation processor. + * + * @opensearch.internal + */ +public class QueryPhaseSearcherWrapper implements QueryPhaseSearcher { + private static final Logger LOGGER = LogManager.getLogger(QueryPhaseSearcherWrapper.class); + private final QueryPhaseSearcher defaultQueryPhaseSearcher; + private final QueryPhaseSearcher concurrentQueryPhaseSearcher; + + public QueryPhaseSearcherWrapper() { + this.defaultQueryPhaseSearcher = new QueryPhase.DefaultQueryPhaseSearcher(); + this.concurrentQueryPhaseSearcher = FeatureFlags.isEnabled(FeatureFlags.CONCURRENT_SEGMENT_SEARCH) + ? new ConcurrentQueryPhaseSearcher() + : null; + } + + /** + * Perform search using {@link CollectorManager} + * + * @param searchContext search context + * @param searcher context index searcher + * @param query query + * @param hasTimeout "true" if timeout was set, "false" otherwise + * @return is rescoring required or not + * @throws java.io.IOException IOException + */ + @Override + public boolean searchWith( + SearchContext searchContext, + ContextIndexSearcher searcher, + Query query, + LinkedList collectors, + boolean hasFilterCollector, + boolean hasTimeout + ) throws IOException { + if (searchContext.isConcurrentSegmentSearchEnabled()) { + LOGGER.info("Using concurrent search over segments (experimental)"); + return concurrentQueryPhaseSearcher.searchWith(searchContext, searcher, query, collectors, hasFilterCollector, hasTimeout); + } else { + return defaultQueryPhaseSearcher.searchWith(searchContext, searcher, query, collectors, hasFilterCollector, hasTimeout); + } + } + + /** + * {@link AggregationProcessor} to use to setup and post process aggregation related collectors during search request + * @param searchContext search context + * @return {@link AggregationProcessor} to use + */ + @Override + public AggregationProcessor aggregationProcessor(SearchContext searchContext) { + if (searchContext.isConcurrentSegmentSearchEnabled()) { + LOGGER.info("Using concurrent search over segments (experimental)"); + return concurrentQueryPhaseSearcher.aggregationProcessor(searchContext); + } else { + return defaultQueryPhaseSearcher.aggregationProcessor(searchContext); + } + } +} diff --git a/server/src/test/java/org/opensearch/common/settings/SettingsModuleTests.java b/server/src/test/java/org/opensearch/common/settings/SettingsModuleTests.java index b15d3518e2f99..4490f6b39996f 100644 --- a/server/src/test/java/org/opensearch/common/settings/SettingsModuleTests.java +++ b/server/src/test/java/org/opensearch/common/settings/SettingsModuleTests.java @@ -36,6 +36,8 @@ import org.opensearch.common.settings.Setting.Property; import org.hamcrest.Matchers; import org.opensearch.common.util.FeatureFlags; +import org.opensearch.index.IndexSettings; +import org.opensearch.search.SearchService; import org.opensearch.test.FeatureFlagSetter; import java.util.Arrays; @@ -282,4 +284,55 @@ public void testDynamicIndexSettingsRegistration() { () -> module.registerDynamicSetting(Setting.floatSetting("index.custom.setting2", 1.0f, Property.IndexScope)) ); } + + public void testConcurrentSegmentSearchClusterSettings() { + // Test that we throw an exception without the feature flag + Settings settings = Settings.builder().put(SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING.getKey(), true).build(); + SettingsException ex = expectThrows(SettingsException.class, () -> new SettingsModule(settings)); + assertEquals( + "unknown setting [" + + SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING.getKey() + + "] please check that any required plugins are installed, or check the breaking " + + "changes documentation for removed settings", + ex.getMessage() + ); + + // Test that the settings updates correctly with the feature flag + FeatureFlagSetter.set(FeatureFlags.CONCURRENT_SEGMENT_SEARCH); + boolean settingValue = randomBoolean(); + Settings settingsWithFeatureFlag = Settings.builder() + .put(SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING.getKey(), settingValue) + .build(); + SettingsModule settingsModule = new SettingsModule(settingsWithFeatureFlag); + assertEquals(settingValue, SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING.get(settingsModule.getSettings())); + } + + public void testConcurrentSegmentSearchIndexSettings() { + Settings.Builder target = Settings.builder().put(Settings.EMPTY); + Settings.Builder update = Settings.builder(); + + // Test that we throw an exception without the feature flag + SettingsModule module = new SettingsModule(Settings.EMPTY); + IndexScopedSettings indexScopedSettings = module.getIndexScopedSettings(); + expectThrows( + SettingsException.class, + () -> indexScopedSettings.updateDynamicSettings( + Settings.builder().put(IndexSettings.INDEX_CONCURRENT_SEGMENT_SEARCH_SETTING.getKey(), true).build(), + target, + update, + "node" + ) + ); + + // Test that the settings updates correctly with the feature flag + FeatureFlagSetter.set(FeatureFlags.CONCURRENT_SEGMENT_SEARCH); + SettingsModule moduleWithFeatureFlag = new SettingsModule(Settings.EMPTY); + IndexScopedSettings indexScopedSettingsWithFeatureFlag = moduleWithFeatureFlag.getIndexScopedSettings(); + indexScopedSettingsWithFeatureFlag.updateDynamicSettings( + Settings.builder().put(IndexSettings.INDEX_CONCURRENT_SEGMENT_SEARCH_SETTING.getKey(), true).build(), + target, + update, + "node" + ); + } } diff --git a/server/src/test/java/org/opensearch/search/SearchModuleTests.java b/server/src/test/java/org/opensearch/search/SearchModuleTests.java index 08a19cb89ac68..c0351c9dccbc1 100644 --- a/server/src/test/java/org/opensearch/search/SearchModuleTests.java +++ b/server/src/test/java/org/opensearch/search/SearchModuleTests.java @@ -79,6 +79,7 @@ import org.opensearch.search.query.ConcurrentQueryPhaseSearcher; import org.opensearch.search.query.QueryPhase; import org.opensearch.search.query.QueryPhaseSearcher; +import org.opensearch.search.query.QueryPhaseSearcherWrapper; import org.opensearch.search.rescore.QueryRescorerBuilder; import org.opensearch.search.rescore.RescoreContext; import org.opensearch.search.rescore.RescorerBuilder; @@ -425,7 +426,7 @@ public void testDefaultQueryPhaseSearcher() { SearchModule searchModule = new SearchModule(Settings.EMPTY, Collections.emptyList()); TestSearchContext searchContext = new TestSearchContext(null); QueryPhase queryPhase = searchModule.getQueryPhase(); - assertTrue(queryPhase.getQueryPhaseSearcher() instanceof QueryPhase.DefaultQueryPhaseSearcher); + assertTrue(queryPhase.getQueryPhaseSearcher() instanceof QueryPhaseSearcherWrapper); assertTrue(queryPhase.getQueryPhaseSearcher().aggregationProcessor(searchContext) instanceof DefaultAggregationProcessor); } @@ -434,8 +435,9 @@ public void testConcurrentQueryPhaseSearcher() { FeatureFlags.initializeFeatureFlags(settings); SearchModule searchModule = new SearchModule(settings, Collections.emptyList()); TestSearchContext searchContext = new TestSearchContext(null); + searchContext.setConcurrentSegmentSearchEnabled(true); QueryPhase queryPhase = searchModule.getQueryPhase(); - assertTrue(queryPhase.getQueryPhaseSearcher() instanceof ConcurrentQueryPhaseSearcher); + assertTrue(queryPhase.getQueryPhaseSearcher() instanceof QueryPhaseSearcherWrapper); assertTrue(queryPhase.getQueryPhaseSearcher().aggregationProcessor(searchContext) instanceof ConcurrentAggregationProcessor); FeatureFlags.initializeFeatureFlags(Settings.EMPTY); } diff --git a/server/src/test/java/org/opensearch/search/SearchServiceTests.java b/server/src/test/java/org/opensearch/search/SearchServiceTests.java index 72c74ddb71725..8f8789a3a0323 100644 --- a/server/src/test/java/org/opensearch/search/SearchServiceTests.java +++ b/server/src/test/java/org/opensearch/search/SearchServiceTests.java @@ -61,6 +61,7 @@ import org.opensearch.common.settings.Settings; import org.opensearch.common.settings.SettingsException; import org.opensearch.common.unit.TimeValue; +import org.opensearch.common.util.FeatureFlags; import org.opensearch.core.common.Strings; import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException; import org.opensearch.core.xcontent.XContentBuilder; @@ -227,7 +228,7 @@ public void onQueryPhase(SearchContext context, long tookInNanos) { @Override protected Settings nodeSettings() { - return Settings.builder().put("search.default_search_timeout", "5s").build(); + return Settings.builder().put("search.default_search_timeout", "5s").put(FeatureFlags.CONCURRENT_SEGMENT_SEARCH, true).build(); } public void testClearOnClose() { @@ -1177,6 +1178,109 @@ public void testCreateSearchContext() throws IOException { } } + /** + * Test that the Search Context for concurrent segment search enabled is set correctly based on both + * index and cluster settings. + */ + public void testConcurrentSegmentSearchSearchContext() throws IOException { + Boolean[][] scenarios = { + // cluster setting, index setting, concurrent search enabled? + { null, null, true }, + { null, false, false }, + { null, true, true }, + { true, null, true }, + { true, false, false }, + { true, true, true }, + { false, null, false }, + { false, false, false }, + { false, true, true } }; + + String index = randomAlphaOfLengthBetween(5, 10).toLowerCase(Locale.ROOT); + IndexService indexService = createIndex(index); + final SearchService service = getInstanceFromNode(SearchService.class); + ShardId shardId = new ShardId(indexService.index(), 0); + long nowInMillis = System.currentTimeMillis(); + String clusterAlias = randomBoolean() ? null : randomAlphaOfLengthBetween(3, 10); + SearchRequest searchRequest = new SearchRequest(); + searchRequest.allowPartialSearchResults(randomBoolean()); + ShardSearchRequest request = new ShardSearchRequest( + OriginalIndices.NONE, + searchRequest, + shardId, + indexService.numberOfShards(), + AliasFilter.EMPTY, + 1f, + nowInMillis, + clusterAlias, + Strings.EMPTY_ARRAY + ); + + for (Boolean[] scenario : scenarios) { + Boolean clusterSetting = scenario[0]; + Boolean indexSetting = scenario[1]; + Boolean concurrentSearchEnabled = scenario[2]; + + if (clusterSetting == null) { + client().admin() + .cluster() + .prepareUpdateSettings() + .setTransientSettings(Settings.builder().putNull(SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING.getKey())) + .get(); + } else { + client().admin() + .cluster() + .prepareUpdateSettings() + .setTransientSettings( + Settings.builder().put(SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING.getKey(), clusterSetting) + ) + .get(); + } + + if (indexSetting == null) { + client().admin() + .indices() + .prepareUpdateSettings(index) + .setSettings(Settings.builder().putNull(IndexSettings.INDEX_CONCURRENT_SEGMENT_SEARCH_SETTING.getKey())) + .get(); + } else { + client().admin() + .indices() + .prepareUpdateSettings(index) + .setSettings(Settings.builder().put(IndexSettings.INDEX_CONCURRENT_SEGMENT_SEARCH_SETTING.getKey(), indexSetting)) + .get(); + } + + try (DefaultSearchContext searchContext = service.createSearchContext(request, new TimeValue(System.currentTimeMillis()))) { + assertEquals( + clusterSetting, + client().admin() + .cluster() + .prepareState() + .get() + .getState() + .getMetadata() + .transientSettings() + .getAsBoolean(SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING.getKey(), null) + ); + assertEquals( + indexSetting == null ? null : indexSetting.toString(), + client().admin() + .indices() + .prepareGetSettings(index) + .get() + .getSetting(index, IndexSettings.INDEX_CONCURRENT_SEGMENT_SEARCH_SETTING.getKey()) + ); + assertEquals(concurrentSearchEnabled, searchContext.isConcurrentSegmentSearchEnabled()); + } + } + // Cleanup + client().admin() + .cluster() + .prepareUpdateSettings() + .setTransientSettings(Settings.builder().putNull(SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING.getKey())) + .get(); + } + /** * While we have no NPE in DefaultContext constructor anymore, we still want to guard against it (or other failures) in the future to * avoid leaking searchers. diff --git a/server/src/test/java/org/opensearch/search/aggregations/AggregationSetupTests.java b/server/src/test/java/org/opensearch/search/aggregations/AggregationSetupTests.java index 0095fd097d3f5..73ab8d7dc814c 100644 --- a/server/src/test/java/org/opensearch/search/aggregations/AggregationSetupTests.java +++ b/server/src/test/java/org/opensearch/search/aggregations/AggregationSetupTests.java @@ -13,6 +13,7 @@ import org.opensearch.index.IndexService; import org.opensearch.search.internal.SearchContext; import org.opensearch.test.OpenSearchSingleNodeTestCase; +import org.opensearch.test.TestSearchContext; import java.io.IOException; @@ -36,6 +37,7 @@ public void setUp() throws Exception { client().prepareIndex("idx").setId("1").setSource("f", 5).execute().get(); client().admin().indices().prepareRefresh("idx").get(); context = createSearchContext(index); + ((TestSearchContext) context).setConcurrentSegmentSearchEnabled(true); } protected AggregatorFactories getAggregationFactories(String agg) throws IOException { diff --git a/server/src/test/java/org/opensearch/search/query/QueryPhaseTests.java b/server/src/test/java/org/opensearch/search/query/QueryPhaseTests.java index 7e6d31a51bd4d..dcec4842fc81e 100644 --- a/server/src/test/java/org/opensearch/search/query/QueryPhaseTests.java +++ b/server/src/test/java/org/opensearch/search/query/QueryPhaseTests.java @@ -411,6 +411,9 @@ public void testTerminateAfterEarlyTermination() throws Exception { TestSearchContext context = new TestSearchContext(null, indexShard, newContextSearcher(reader, executor)); context.setTask(new SearchShardTask(123L, "", "", "", null, Collections.emptyMap())); context.parsedQuery(new ParsedQuery(new MatchAllDocsQuery())); + if (this.executor != null) { + context.setConcurrentSegmentSearchEnabled(true); + } context.terminateAfter(numDocs); { diff --git a/test/framework/src/main/java/org/opensearch/test/TestSearchContext.java b/test/framework/src/main/java/org/opensearch/test/TestSearchContext.java index 2b7e1450b9fbc..82e16e6c13005 100644 --- a/test/framework/src/main/java/org/opensearch/test/TestSearchContext.java +++ b/test/framework/src/main/java/org/opensearch/test/TestSearchContext.java @@ -115,6 +115,14 @@ public class TestSearchContext extends SearchContext { private FieldDoc searchAfter; private Profilers profilers; private CollapseContext collapse; + protected boolean concurrentSegmentSearchEnabled; + + /** + * Sets the concurrent segment search enabled field + */ + public void setConcurrentSegmentSearchEnabled(boolean concurrentSegmentSearchEnabled) { + this.concurrentSegmentSearchEnabled = concurrentSegmentSearchEnabled; + } private final Map searchExtBuilders = new HashMap<>(); @@ -605,6 +613,14 @@ public Profilers getProfilers() { return profilers; } + /** + * Returns concurrent segment search status for the search context + */ + @Override + public boolean isConcurrentSegmentSearchEnabled() { + return concurrentSegmentSearchEnabled; + } + @Override public Map, CollectorManager> queryCollectorManagers() { return queryCollectorManagers;