Skip to content
This repository has been archived by the owner on Aug 9, 2022. It is now read-only.

Cleaning up settings name, Stats URL and redacting node details #35

Merged
merged 7 commits into from
Feb 14, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,15 @@ DELETE /_opendistro/_asynchronous_search/FjdITFhYbC1zVFdHVVV1MUd3UkxkMFEFMjQ1MzY
**4. Stats for asynchronous search**

```
GET /_opendistro/_asynchronous_search/_stats
GET /_opendistro/_asynchronous_search/stats
```

**Tunable Settings**
1. `opendistro_asynchronous_search.max_search_running_time` : Maximum running time for the search beyond which the search would be terminated
2. `opendistro_asynchronous_search.max_running_searches` : Concurrent searches running per coordinator node
3. `opendistro_asynchronous_search.max_keep_alive` : Maximum keep alive for search which dictates how long the search is allowed to be present in the cluster
4. `opendistro_asynchronous_search.max_wait_for_completion_timeout` : Maximum keep on completion to block for the search response
1. `opendistro.asynchronous_search.max_search_running_time` : Maximum running time for the search beyond which the search would be terminated
2. `opendistro.asynchronous_search.node_concurrent_running_searches` : Concurrent searches running per coordinator node
3. `opendistro.asynchronous_search.max_keep_alive` : Maximum keep alive for search which dictates how long the search is allowed to be present in the cluster
4. `opendistro.asynchronous_search.max_wait_for_completion_timeout` : Maximum keep on completion to block for the search response
5. `opendistro.asynchronous_search.persist_search_failures` : Persist asynchronous search result ending with search failure in system index

## Setup

Expand Down
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ ext {
}

group 'com.amazon.opendistroforelasticsearch'
version = "${opendistroVersion}.0"
version = "${opendistroVersion}.1"

sourceCompatibility = 1.9

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
## 2021-02-13 Version 1.13.0.1
Compatible with Elasticsearch 7.10.2
### Maintenance
* Renamed settings for consistency with other ODFE plugins ([#35](https://github.com/opendistro-for-elasticsearch/asynchronous-search
/pull/35))
Original file line number Diff line number Diff line change
Expand Up @@ -37,32 +37,33 @@
public class AsynchronousSearchActiveStore {

private static Logger logger = LogManager.getLogger(AsynchronousSearchActiveStore.class);
private volatile int maxRunningSearches;
public static final int DEFAULT_MAX_RUNNING_SEARCHES = 20;
public static final Setting<Integer> MAX_RUNNING_SEARCHES_SETTING = Setting.intSetting(
"opendistro_asynchronous_search.max_running_searches", DEFAULT_MAX_RUNNING_SEARCHES, 0, Setting.Property.Dynamic,
Setting.Property.NodeScope);
private volatile int nodeConcurrentRunningSearches;
public static final int NODE_CONCURRENT_RUNNING_SEARCHES = 20;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add something to variable name to signify this is max allowed per node ?

public static final Setting<Integer> NODE_CONCURRENT_RUNNING_SEARCHES_SETTING = Setting.intSetting(
"opendistro.asynchronous_search.node_concurrent_running_searches", NODE_CONCURRENT_RUNNING_SEARCHES, 0,
Setting.Property.Dynamic, Setting.Property.NodeScope);

private final ConcurrentMapLong<AsynchronousSearchActiveContext> activeContexts = newConcurrentMapLongWithAggressiveConcurrency();

public AsynchronousSearchActiveStore(ClusterService clusterService) {
Settings settings = clusterService.getSettings();
maxRunningSearches = MAX_RUNNING_SEARCHES_SETTING.get(settings);
clusterService.getClusterSettings().addSettingsUpdateConsumer(MAX_RUNNING_SEARCHES_SETTING, this::setMaxRunningSearches);
nodeConcurrentRunningSearches = NODE_CONCURRENT_RUNNING_SEARCHES_SETTING.get(settings);
clusterService.getClusterSettings().addSettingsUpdateConsumer(NODE_CONCURRENT_RUNNING_SEARCHES_SETTING,
this::setNodeConcurrentRunningSearches);
}

private void setMaxRunningSearches(int maxRunningSearches) {
this.maxRunningSearches = maxRunningSearches;
private void setNodeConcurrentRunningSearches(int nodeConcurrentRunningSearches) {
this.nodeConcurrentRunningSearches = nodeConcurrentRunningSearches;
}

public synchronized void putContext(AsynchronousSearchContextId asynchronousSearchContextId,
AsynchronousSearchActiveContext asynchronousSearchContext,
Consumer<AsynchronousSearchContextId> contextRejectionEventConsumer) {
if (activeContexts.size() >= maxRunningSearches) {
if (activeContexts.size() >= nodeConcurrentRunningSearches) {
contextRejectionEventConsumer.accept(asynchronousSearchContextId);
throw new EsRejectedExecutionException("Trying to create too many running contexts. Must be less than or equal to: ["
+ maxRunningSearches + "]. This limit can be set by changing the [" + MAX_RUNNING_SEARCHES_SETTING.getKey()
+ "] setting.");
throw new EsRejectedExecutionException("Trying to create too many concurrent searches. Must be less than or equal to: ["
+ nodeConcurrentRunningSearches + "]. This limit can be set by changing the ["
+ NODE_CONCURRENT_RUNNING_SEARCHES_SETTING.getKey() + "] settings.");
}
activeContexts.put(asynchronousSearchContextId.getId(), asynchronousSearchContext);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,11 +84,11 @@ public class AsynchronousSearchManagementService extends AbstractLifecycleCompon
"indices:data/read/opendistro/asynchronous_search/response_cleanup";

public static final Setting<TimeValue> ACTIVE_CONTEXT_REAPER_INTERVAL_SETTING =
Setting.timeSetting("opendistro_asynchronous_search.active.context.reaper_interval", TimeValue.timeValueMinutes(5),
Setting.timeSetting("opendistro.asynchronous_search.active.context.reaper_interval", TimeValue.timeValueMinutes(5),
TimeValue.timeValueSeconds(5),
Setting.Property.NodeScope);
public static final Setting<TimeValue> PERSISTED_RESPONSE_CLEAN_UP_INTERVAL_SETTING =
Setting.timeSetting("opendistro_asynchronous_search.expired.persisted_response.cleanup_interval",
Setting.timeSetting("opendistro.asynchronous_search.expired.persisted_response.cleanup_interval",
TimeValue.timeValueMinutes(5), TimeValue.timeValueSeconds(5),
Setting.Property.NodeScope);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ public Collection<Object> createComponents(Client client, ClusterService cluster
@Override
public List<Setting<?>> getSettings() {
return Arrays.asList(
AsynchronousSearchActiveStore.MAX_RUNNING_SEARCHES_SETTING,
AsynchronousSearchActiveStore.NODE_CONCURRENT_RUNNING_SEARCHES_SETTING,
AsynchronousSearchService.MAX_KEEP_ALIVE_SETTING,
AsynchronousSearchService.MAX_SEARCH_RUNNING_TIME_SETTING,
AsynchronousSearchService.MAX_WAIT_FOR_COMPLETION_TIMEOUT_SETTING,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ public String getName() {
@Override
public List<Route> routes() {
return Arrays.asList(
new Route(GET, AsynchronousSearchPlugin.BASE_URI + "/_nodes/{nodeId}/_stats"),
new Route(GET, AsynchronousSearchPlugin.BASE_URI + "/_stats")
new Route(GET, AsynchronousSearchPlugin.BASE_URI + "/_nodes/{nodeId}/stats"),
new Route(GET, AsynchronousSearchPlugin.BASE_URI + "/stats")
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,17 +108,17 @@ public class AsynchronousSearchService extends AbstractLifecycleComponent implem
private static final Logger logger = LogManager.getLogger(AsynchronousSearchService.class);

public static final Setting<TimeValue> MAX_KEEP_ALIVE_SETTING =
Setting.positiveTimeSetting("opendistro_asynchronous_search.max_keep_alive", timeValueDays(5),
Setting.positiveTimeSetting("opendistro.asynchronous_search.max_keep_alive", timeValueDays(5),
Setting.Property.NodeScope, Setting.Property.Dynamic);
public static final Setting<TimeValue> MAX_SEARCH_RUNNING_TIME_SETTING =
Setting.positiveTimeSetting("opendistro_asynchronous_search.max_search_running_time", timeValueHours(12),
Setting.positiveTimeSetting("opendistro.asynchronous_search.max_search_running_time", timeValueHours(12),
Setting.Property.NodeScope, Setting.Property.Dynamic);
public static final Setting<TimeValue> MAX_WAIT_FOR_COMPLETION_TIMEOUT_SETTING = Setting.positiveTimeSetting(
"opendistro_asynchronous_search.max_wait_for_completion_timeout", timeValueMinutes(1), Setting.Property.NodeScope,
"opendistro.asynchronous_search.max_wait_for_completion_timeout", timeValueMinutes(1), Setting.Property.NodeScope,
Setting.Property.Dynamic);

public static final Setting<Boolean> PERSIST_SEARCH_FAILURES_SETTING =
Setting.boolSetting("opendistro_asynchronous_search.persist_search_failures", false,
Setting.boolSetting("opendistro.asynchronous_search.persist_search_failures", false,
Setting.Property.NodeScope, Setting.Property.Dynamic);

private volatile long maxKeepAlive;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,13 @@

import org.elasticsearch.action.support.nodes.BaseNodeResponse;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodeRole;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.ToXContentFragment;
import org.elasticsearch.common.xcontent.XContentBuilder;

import java.io.IOException;
import java.util.Map;

/**
* Class represents all stats the plugin keeps track of on a single node
Expand Down Expand Up @@ -56,24 +54,6 @@ public void writeTo(StreamOutput out) throws IOException {

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.field("name", getNode().getName());
builder.field("transport_address", getNode().getAddress().toString());
builder.field("host", getNode().getHostName());
builder.field("ip", getNode().getAddress());

builder.startArray("roles");
for (DiscoveryNodeRole role : getNode().getRoles()) {
builder.value(role.roleName());
}
builder.endArray();

if (getNode().getAttributes().isEmpty() == false) {
builder.startObject("attributes");
for (Map.Entry<String, String> attrEntry : getNode().getAttributes().entrySet()) {
builder.field(attrEntry.getKey(), attrEntry.getValue());
}
builder.endObject();
}
if (asynchronousSearchCountStats != null) {
asynchronousSearchCountStats.toXContent(builder, params);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,11 @@ public void createObjects() {
Settings settings = Settings.builder()
.put("node.name", "test")
.put("cluster.name", "ClusterServiceTests")
.put(AsynchronousSearchActiveStore.MAX_RUNNING_SEARCHES_SETTING.getKey(), maxRunningContexts)
.put(AsynchronousSearchActiveStore.NODE_CONCURRENT_RUNNING_SEARCHES_SETTING.getKey(), maxRunningContexts)
.build();
final Set<Setting<?>> settingsSet =
Stream.concat(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS.stream(), Stream.of(
AsynchronousSearchActiveStore.MAX_RUNNING_SEARCHES_SETTING)).collect(Collectors.toSet());
AsynchronousSearchActiveStore.NODE_CONCURRENT_RUNNING_SEARCHES_SETTING)).collect(Collectors.toSet());
final int availableProcessors = EsExecutors.allocatedProcessors(settings);
List<ExecutorBuilder<?>> executorBuilders = new ArrayList<>();
executorBuilders.add(new ScalingExecutorBuilder(AsynchronousSearchPlugin.OPEN_DISTRO_ASYNC_SEARCH_GENERIC_THREAD_POOL_NAME, 1,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ public void onFailure(Exception e) {
@Override
public void onFailure(Exception e) {
responses.add(e);
assertThat(e.getMessage(), startsWith("Trying to create too many running contexts"));
assertThat(e.getMessage(), startsWith("Trying to create too many concurrent searches"));
latch.countDown();
}
}, latch));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ protected Settings nodeSettings(int nodeOrdinal) {
logger.info("Using lowLevelCancellation: {}", lowLevelCancellation);
return Settings.builder()
.put(super.nodeSettings(nodeOrdinal))
.put(AsynchronousSearchActiveStore.MAX_RUNNING_SEARCHES_SETTING.getKey(), asConcurrentLimit)
.put(AsynchronousSearchActiveStore.NODE_CONCURRENT_RUNNING_SEARCHES_SETTING.getKey(), asConcurrentLimit)
.put(AsynchronousSearchService.PERSIST_SEARCH_FAILURES_SETTING.getKey(), true)
.build();
}
Expand Down Expand Up @@ -269,7 +269,7 @@ public void testThrottledAsynchronousSearchCount() throws InterruptedException,
SubmitAsynchronousSearchRequest submitAsynchronousSearchRequest = new SubmitAsynchronousSearchRequest(searchRequest);
executeSubmitAsynchronousSearch(client(randomDataNode.getName()), submitAsynchronousSearchRequest);
} catch (ExecutionException e) {
assertThat(e.getMessage(), containsString("Trying to create too many running contexts"));
assertThat(e.getMessage(), containsString("Trying to create too many concurrent searches"));
} catch (InterruptedException e) {
fail(e.getMessage());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,12 @@

public class SubmitAsynchronousSearchSingleNodeIT extends AsynchronousSearchSingleNodeTestCase {

private int asConcurrentLimit = 60;
private int asynchronousSearchConcurrentLimit = 60;

@Override
protected Settings nodeSettings() {
return Settings.builder().put(AsynchronousSearchActiveStore.MAX_RUNNING_SEARCHES_SETTING.getKey(), asConcurrentLimit).build();
return Settings.builder().put(AsynchronousSearchActiveStore.NODE_CONCURRENT_RUNNING_SEARCHES_SETTING.getKey(),
asynchronousSearchConcurrentLimit).build();
}

public void
Expand Down Expand Up @@ -101,12 +102,12 @@ public void testSubmitAsynchronousSearchWithRetainedResponse() throws Interrupte
}

public void testSubmitAsynchronousSearchWithNoRetainedResponseBlocking() throws Exception {
int concurrentRuns = randomIntBetween(asConcurrentLimit + 10, asConcurrentLimit + 20);
int concurrentRuns = randomIntBetween(asynchronousSearchConcurrentLimit + 10, asynchronousSearchConcurrentLimit + 20);
assertConcurrentSubmitsForBlockedSearch((numStartedAsynchronousSearch, numFailedAsynchronousSearch,
numRejectedAsynchronousSearch) -> {
assertEquals(asConcurrentLimit, numStartedAsynchronousSearch.get());
assertEquals(concurrentRuns - asConcurrentLimit, numFailedAsynchronousSearch.get());
assertEquals(concurrentRuns - asConcurrentLimit, numRejectedAsynchronousSearch.get());
assertEquals(asynchronousSearchConcurrentLimit, numStartedAsynchronousSearch.get());
assertEquals(concurrentRuns - asynchronousSearchConcurrentLimit, numFailedAsynchronousSearch.get());
assertEquals(concurrentRuns - asynchronousSearchConcurrentLimit, numRejectedAsynchronousSearch.get());
}, concurrentRuns);
AsynchronousSearchService asynchronousSearchService = getInstanceFromNode(AsynchronousSearchService.class);
waitUntil(asynchronousSearchService.getAllActiveContexts()::isEmpty,30, TimeUnit.SECONDS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.atomic.AtomicInteger;

import static com.amazon.opendistroforelasticsearch.search.asynchronous.context.active.AsynchronousSearchActiveStore.DEFAULT_MAX_RUNNING_SEARCHES;
import static com.amazon.opendistroforelasticsearch.search.asynchronous.context.active.AsynchronousSearchActiveStore.NODE_CONCURRENT_RUNNING_SEARCHES;
import static org.hamcrest.Matchers.containsString;

public class AsynchronousSearchSettingsIT extends AsynchronousSearchRestTestCase {
Expand Down Expand Up @@ -98,7 +98,7 @@ public void testMaxRunningAsynchronousSearchContexts() throws Exception {
thread.join();
}

updateClusterSettings(AsynchronousSearchActiveStore.MAX_RUNNING_SEARCHES_SETTING.getKey(), 0);
updateClusterSettings(AsynchronousSearchActiveStore.NODE_CONCURRENT_RUNNING_SEARCHES_SETTING.getKey(), 0);
threadsList.clear();
AtomicInteger numFailures = new AtomicInteger();
for (int i = 0; i < numThreads; i++) {
Expand All @@ -109,7 +109,7 @@ public void testMaxRunningAsynchronousSearchContexts() throws Exception {
AsynchronousSearchResponse asResponse = executeSubmitAsynchronousSearch(validRequest);
} catch (Exception e) {
assertTrue(e instanceof ResponseException);
assertThat(e.getMessage(), containsString("Trying to create too many running contexts"));
assertThat(e.getMessage(), containsString("Trying to create too many concurrent searches"));
numFailures.getAndIncrement();

} finally {
Expand All @@ -128,7 +128,8 @@ public void testMaxRunningAsynchronousSearchContexts() throws Exception {
thread.join();
}
assertEquals(numFailures.get(), 50);
updateClusterSettings(AsynchronousSearchActiveStore.MAX_RUNNING_SEARCHES_SETTING.getKey(), DEFAULT_MAX_RUNNING_SEARCHES);
updateClusterSettings(AsynchronousSearchActiveStore.NODE_CONCURRENT_RUNNING_SEARCHES_SETTING.getKey(),
NODE_CONCURRENT_RUNNING_SEARCHES);
}

public void testStoreAsyncSearchWithFailures() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,11 +87,11 @@ public void createObjects() {
Settings settings = Settings.builder()
.put("node.name", "test")
.put("cluster.name", "ClusterServiceTests")
.put(AsynchronousSearchActiveStore.MAX_RUNNING_SEARCHES_SETTING.getKey(), 10)
.put(AsynchronousSearchActiveStore.NODE_CONCURRENT_RUNNING_SEARCHES_SETTING.getKey(), 10)
.build();
final Set<Setting<?>> settingsSet =
Stream.concat(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS.stream(), Stream.of(
AsynchronousSearchActiveStore.MAX_RUNNING_SEARCHES_SETTING,
AsynchronousSearchActiveStore.NODE_CONCURRENT_RUNNING_SEARCHES_SETTING,
AsynchronousSearchService.PERSIST_SEARCH_FAILURES_SETTING,
AsynchronousSearchService.MAX_KEEP_ALIVE_SETTING,
AsynchronousSearchService.MAX_SEARCH_RUNNING_TIME_SETTING,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,11 +101,11 @@ public void createObjects() {
Settings settings = Settings.builder()
.put("node.name", "test")
.put("cluster.name", "ClusterServiceTests")
.put(AsynchronousSearchActiveStore.MAX_RUNNING_SEARCHES_SETTING.getKey(), 10)
.put(AsynchronousSearchActiveStore.NODE_CONCURRENT_RUNNING_SEARCHES_SETTING.getKey(), 10)
.build();
final Set<Setting<?>> settingsSet =
Stream.concat(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS.stream(), Stream.of(
AsynchronousSearchActiveStore.MAX_RUNNING_SEARCHES_SETTING,
AsynchronousSearchActiveStore.NODE_CONCURRENT_RUNNING_SEARCHES_SETTING,
AsynchronousSearchService.MAX_KEEP_ALIVE_SETTING,
AsynchronousSearchService.PERSIST_SEARCH_FAILURES_SETTING,
AsynchronousSearchService.MAX_SEARCH_RUNNING_TIME_SETTING,
Expand Down
Loading