Skip to content
This repository has been archived by the owner on Aug 9, 2022. It is now read-only.

Commit

Permalink
Cleaning up settings name, Stats URL and redacting node details (#35)
Browse files Browse the repository at this point in the history
1. Renames the settings to have opendistro.asynchronous_search
2. Replaces _stats to stats
3. Redacts the extra private details exposed by the stats
4. Corrects throttling messages

Co-authored-by: Bukhtawar Khan <bukhtawa@amazon.com>
  • Loading branch information
eirsep and Bukhtawar authored Feb 14, 2021
1 parent 577a927 commit e6322fb
Show file tree
Hide file tree
Showing 19 changed files with 62 additions and 73 deletions.
11 changes: 6 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,15 @@ DELETE /_opendistro/_asynchronous_search/FjdITFhYbC1zVFdHVVV1MUd3UkxkMFEFMjQ1MzY
**4. Stats for asynchronous search**

```
GET /_opendistro/_asynchronous_search/_stats
GET /_opendistro/_asynchronous_search/stats
```

**Tunable Settings**
1. `opendistro_asynchronous_search.max_search_running_time` : Maximum running time for the search beyond which the search would be terminated
2. `opendistro_asynchronous_search.max_running_searches` : Concurrent searches running per coordinator node
3. `opendistro_asynchronous_search.max_keep_alive` : Maximum keep alive for search which dictates how long the search is allowed to be present in the cluster
4. `opendistro_asynchronous_search.max_wait_for_completion_timeout` : Maximum keep on completion to block for the search response
1. `opendistro.asynchronous_search.max_search_running_time` : Maximum running time for the search beyond which the search would be terminated
2. `opendistro.asynchronous_search.node_concurrent_running_searches` : Concurrent searches running per coordinator node
3. `opendistro.asynchronous_search.max_keep_alive` : Maximum keep alive for search which dictates how long the search is allowed to be present in the cluster
4. `opendistro.asynchronous_search.max_wait_for_completion_timeout` : Maximum keep on completion to block for the search response
5. `opendistro.asynchronous_search.persist_search_failures` : Persist asynchronous search result ending with search failure in system index

## Setup

Expand Down
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ ext {
}

group 'com.amazon.opendistroforelasticsearch'
version = "${opendistroVersion}.0"
version = "${opendistroVersion}.1"

sourceCompatibility = 1.9

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
## 2021-02-13 Version 1.13.0.1
Compatible with Elasticsearch 7.10.2
### Maintenance
* Renamed settings for consistency with other ODFE plugins ([#35](https://github.com/opendistro-for-elasticsearch/asynchronous-search
/pull/35))
Original file line number Diff line number Diff line change
Expand Up @@ -37,32 +37,33 @@
public class AsynchronousSearchActiveStore {

private static Logger logger = LogManager.getLogger(AsynchronousSearchActiveStore.class);
private volatile int maxRunningSearches;
public static final int DEFAULT_MAX_RUNNING_SEARCHES = 20;
public static final Setting<Integer> MAX_RUNNING_SEARCHES_SETTING = Setting.intSetting(
"opendistro_asynchronous_search.max_running_searches", DEFAULT_MAX_RUNNING_SEARCHES, 0, Setting.Property.Dynamic,
Setting.Property.NodeScope);
private volatile int nodeConcurrentRunningSearches;
public static final int NODE_CONCURRENT_RUNNING_SEARCHES = 20;
public static final Setting<Integer> NODE_CONCURRENT_RUNNING_SEARCHES_SETTING = Setting.intSetting(
"opendistro.asynchronous_search.node_concurrent_running_searches", NODE_CONCURRENT_RUNNING_SEARCHES, 0,
Setting.Property.Dynamic, Setting.Property.NodeScope);

private final ConcurrentMapLong<AsynchronousSearchActiveContext> activeContexts = newConcurrentMapLongWithAggressiveConcurrency();

public AsynchronousSearchActiveStore(ClusterService clusterService) {
Settings settings = clusterService.getSettings();
maxRunningSearches = MAX_RUNNING_SEARCHES_SETTING.get(settings);
clusterService.getClusterSettings().addSettingsUpdateConsumer(MAX_RUNNING_SEARCHES_SETTING, this::setMaxRunningSearches);
nodeConcurrentRunningSearches = NODE_CONCURRENT_RUNNING_SEARCHES_SETTING.get(settings);
clusterService.getClusterSettings().addSettingsUpdateConsumer(NODE_CONCURRENT_RUNNING_SEARCHES_SETTING,
this::setNodeConcurrentRunningSearches);
}

private void setMaxRunningSearches(int maxRunningSearches) {
this.maxRunningSearches = maxRunningSearches;
private void setNodeConcurrentRunningSearches(int nodeConcurrentRunningSearches) {
this.nodeConcurrentRunningSearches = nodeConcurrentRunningSearches;
}

public synchronized void putContext(AsynchronousSearchContextId asynchronousSearchContextId,
AsynchronousSearchActiveContext asynchronousSearchContext,
Consumer<AsynchronousSearchContextId> contextRejectionEventConsumer) {
if (activeContexts.size() >= maxRunningSearches) {
if (activeContexts.size() >= nodeConcurrentRunningSearches) {
contextRejectionEventConsumer.accept(asynchronousSearchContextId);
throw new EsRejectedExecutionException("Trying to create too many running contexts. Must be less than or equal to: ["
+ maxRunningSearches + "]. This limit can be set by changing the [" + MAX_RUNNING_SEARCHES_SETTING.getKey()
+ "] setting.");
throw new EsRejectedExecutionException("Trying to create too many concurrent searches. Must be less than or equal to: ["
+ nodeConcurrentRunningSearches + "]. This limit can be set by changing the ["
+ NODE_CONCURRENT_RUNNING_SEARCHES_SETTING.getKey() + "] settings.");
}
activeContexts.put(asynchronousSearchContextId.getId(), asynchronousSearchContext);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,11 +84,11 @@ public class AsynchronousSearchManagementService extends AbstractLifecycleCompon
"indices:data/read/opendistro/asynchronous_search/response_cleanup";

public static final Setting<TimeValue> ACTIVE_CONTEXT_REAPER_INTERVAL_SETTING =
Setting.timeSetting("opendistro_asynchronous_search.active.context.reaper_interval", TimeValue.timeValueMinutes(5),
Setting.timeSetting("opendistro.asynchronous_search.active.context.reaper_interval", TimeValue.timeValueMinutes(5),
TimeValue.timeValueSeconds(5),
Setting.Property.NodeScope);
public static final Setting<TimeValue> PERSISTED_RESPONSE_CLEAN_UP_INTERVAL_SETTING =
Setting.timeSetting("opendistro_asynchronous_search.expired.persisted_response.cleanup_interval",
Setting.timeSetting("opendistro.asynchronous_search.expired.persisted_response.cleanup_interval",
TimeValue.timeValueMinutes(5), TimeValue.timeValueSeconds(5),
Setting.Property.NodeScope);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ public Collection<Object> createComponents(Client client, ClusterService cluster
@Override
public List<Setting<?>> getSettings() {
return Arrays.asList(
AsynchronousSearchActiveStore.MAX_RUNNING_SEARCHES_SETTING,
AsynchronousSearchActiveStore.NODE_CONCURRENT_RUNNING_SEARCHES_SETTING,
AsynchronousSearchService.MAX_KEEP_ALIVE_SETTING,
AsynchronousSearchService.MAX_SEARCH_RUNNING_TIME_SETTING,
AsynchronousSearchService.MAX_WAIT_FOR_COMPLETION_TIMEOUT_SETTING,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ public String getName() {
@Override
public List<Route> routes() {
return Arrays.asList(
new Route(GET, AsynchronousSearchPlugin.BASE_URI + "/_nodes/{nodeId}/_stats"),
new Route(GET, AsynchronousSearchPlugin.BASE_URI + "/_stats")
new Route(GET, AsynchronousSearchPlugin.BASE_URI + "/_nodes/{nodeId}/stats"),
new Route(GET, AsynchronousSearchPlugin.BASE_URI + "/stats")
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,17 +108,17 @@ public class AsynchronousSearchService extends AbstractLifecycleComponent implem
private static final Logger logger = LogManager.getLogger(AsynchronousSearchService.class);

public static final Setting<TimeValue> MAX_KEEP_ALIVE_SETTING =
Setting.positiveTimeSetting("opendistro_asynchronous_search.max_keep_alive", timeValueDays(5),
Setting.positiveTimeSetting("opendistro.asynchronous_search.max_keep_alive", timeValueDays(5),
Setting.Property.NodeScope, Setting.Property.Dynamic);
public static final Setting<TimeValue> MAX_SEARCH_RUNNING_TIME_SETTING =
Setting.positiveTimeSetting("opendistro_asynchronous_search.max_search_running_time", timeValueHours(12),
Setting.positiveTimeSetting("opendistro.asynchronous_search.max_search_running_time", timeValueHours(12),
Setting.Property.NodeScope, Setting.Property.Dynamic);
public static final Setting<TimeValue> MAX_WAIT_FOR_COMPLETION_TIMEOUT_SETTING = Setting.positiveTimeSetting(
"opendistro_asynchronous_search.max_wait_for_completion_timeout", timeValueMinutes(1), Setting.Property.NodeScope,
"opendistro.asynchronous_search.max_wait_for_completion_timeout", timeValueMinutes(1), Setting.Property.NodeScope,
Setting.Property.Dynamic);

public static final Setting<Boolean> PERSIST_SEARCH_FAILURES_SETTING =
Setting.boolSetting("opendistro_asynchronous_search.persist_search_failures", false,
Setting.boolSetting("opendistro.asynchronous_search.persist_search_failures", false,
Setting.Property.NodeScope, Setting.Property.Dynamic);

private volatile long maxKeepAlive;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,13 @@

import org.elasticsearch.action.support.nodes.BaseNodeResponse;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodeRole;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.ToXContentFragment;
import org.elasticsearch.common.xcontent.XContentBuilder;

import java.io.IOException;
import java.util.Map;

/**
* Class represents all stats the plugin keeps track of on a single node
Expand Down Expand Up @@ -56,24 +54,6 @@ public void writeTo(StreamOutput out) throws IOException {

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.field("name", getNode().getName());
builder.field("transport_address", getNode().getAddress().toString());
builder.field("host", getNode().getHostName());
builder.field("ip", getNode().getAddress());

builder.startArray("roles");
for (DiscoveryNodeRole role : getNode().getRoles()) {
builder.value(role.roleName());
}
builder.endArray();

if (getNode().getAttributes().isEmpty() == false) {
builder.startObject("attributes");
for (Map.Entry<String, String> attrEntry : getNode().getAttributes().entrySet()) {
builder.field(attrEntry.getKey(), attrEntry.getValue());
}
builder.endObject();
}
if (asynchronousSearchCountStats != null) {
asynchronousSearchCountStats.toXContent(builder, params);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,11 @@ public void createObjects() {
Settings settings = Settings.builder()
.put("node.name", "test")
.put("cluster.name", "ClusterServiceTests")
.put(AsynchronousSearchActiveStore.MAX_RUNNING_SEARCHES_SETTING.getKey(), maxRunningContexts)
.put(AsynchronousSearchActiveStore.NODE_CONCURRENT_RUNNING_SEARCHES_SETTING.getKey(), maxRunningContexts)
.build();
final Set<Setting<?>> settingsSet =
Stream.concat(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS.stream(), Stream.of(
AsynchronousSearchActiveStore.MAX_RUNNING_SEARCHES_SETTING)).collect(Collectors.toSet());
AsynchronousSearchActiveStore.NODE_CONCURRENT_RUNNING_SEARCHES_SETTING)).collect(Collectors.toSet());
final int availableProcessors = EsExecutors.allocatedProcessors(settings);
List<ExecutorBuilder<?>> executorBuilders = new ArrayList<>();
executorBuilders.add(new ScalingExecutorBuilder(AsynchronousSearchPlugin.OPEN_DISTRO_ASYNC_SEARCH_GENERIC_THREAD_POOL_NAME, 1,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ public void onFailure(Exception e) {
@Override
public void onFailure(Exception e) {
responses.add(e);
assertThat(e.getMessage(), startsWith("Trying to create too many running contexts"));
assertThat(e.getMessage(), startsWith("Trying to create too many concurrent searches"));
latch.countDown();
}
}, latch));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ protected Settings nodeSettings(int nodeOrdinal) {
logger.info("Using lowLevelCancellation: {}", lowLevelCancellation);
return Settings.builder()
.put(super.nodeSettings(nodeOrdinal))
.put(AsynchronousSearchActiveStore.MAX_RUNNING_SEARCHES_SETTING.getKey(), asConcurrentLimit)
.put(AsynchronousSearchActiveStore.NODE_CONCURRENT_RUNNING_SEARCHES_SETTING.getKey(), asConcurrentLimit)
.put(AsynchronousSearchService.PERSIST_SEARCH_FAILURES_SETTING.getKey(), true)
.build();
}
Expand Down Expand Up @@ -269,7 +269,7 @@ public void testThrottledAsynchronousSearchCount() throws InterruptedException,
SubmitAsynchronousSearchRequest submitAsynchronousSearchRequest = new SubmitAsynchronousSearchRequest(searchRequest);
executeSubmitAsynchronousSearch(client(randomDataNode.getName()), submitAsynchronousSearchRequest);
} catch (ExecutionException e) {
assertThat(e.getMessage(), containsString("Trying to create too many running contexts"));
assertThat(e.getMessage(), containsString("Trying to create too many concurrent searches"));
} catch (InterruptedException e) {
fail(e.getMessage());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,12 @@

public class SubmitAsynchronousSearchSingleNodeIT extends AsynchronousSearchSingleNodeTestCase {

private int asConcurrentLimit = 60;
private int asynchronousSearchConcurrentLimit = 60;

@Override
protected Settings nodeSettings() {
return Settings.builder().put(AsynchronousSearchActiveStore.MAX_RUNNING_SEARCHES_SETTING.getKey(), asConcurrentLimit).build();
return Settings.builder().put(AsynchronousSearchActiveStore.NODE_CONCURRENT_RUNNING_SEARCHES_SETTING.getKey(),
asynchronousSearchConcurrentLimit).build();
}

public void
Expand Down Expand Up @@ -101,12 +102,12 @@ public void testSubmitAsynchronousSearchWithRetainedResponse() throws Interrupte
}

public void testSubmitAsynchronousSearchWithNoRetainedResponseBlocking() throws Exception {
int concurrentRuns = randomIntBetween(asConcurrentLimit + 10, asConcurrentLimit + 20);
int concurrentRuns = randomIntBetween(asynchronousSearchConcurrentLimit + 10, asynchronousSearchConcurrentLimit + 20);
assertConcurrentSubmitsForBlockedSearch((numStartedAsynchronousSearch, numFailedAsynchronousSearch,
numRejectedAsynchronousSearch) -> {
assertEquals(asConcurrentLimit, numStartedAsynchronousSearch.get());
assertEquals(concurrentRuns - asConcurrentLimit, numFailedAsynchronousSearch.get());
assertEquals(concurrentRuns - asConcurrentLimit, numRejectedAsynchronousSearch.get());
assertEquals(asynchronousSearchConcurrentLimit, numStartedAsynchronousSearch.get());
assertEquals(concurrentRuns - asynchronousSearchConcurrentLimit, numFailedAsynchronousSearch.get());
assertEquals(concurrentRuns - asynchronousSearchConcurrentLimit, numRejectedAsynchronousSearch.get());
}, concurrentRuns);
AsynchronousSearchService asynchronousSearchService = getInstanceFromNode(AsynchronousSearchService.class);
waitUntil(asynchronousSearchService.getAllActiveContexts()::isEmpty,30, TimeUnit.SECONDS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.atomic.AtomicInteger;

import static com.amazon.opendistroforelasticsearch.search.asynchronous.context.active.AsynchronousSearchActiveStore.DEFAULT_MAX_RUNNING_SEARCHES;
import static com.amazon.opendistroforelasticsearch.search.asynchronous.context.active.AsynchronousSearchActiveStore.NODE_CONCURRENT_RUNNING_SEARCHES;
import static org.hamcrest.Matchers.containsString;

public class AsynchronousSearchSettingsIT extends AsynchronousSearchRestTestCase {
Expand Down Expand Up @@ -98,7 +98,7 @@ public void testMaxRunningAsynchronousSearchContexts() throws Exception {
thread.join();
}

updateClusterSettings(AsynchronousSearchActiveStore.MAX_RUNNING_SEARCHES_SETTING.getKey(), 0);
updateClusterSettings(AsynchronousSearchActiveStore.NODE_CONCURRENT_RUNNING_SEARCHES_SETTING.getKey(), 0);
threadsList.clear();
AtomicInteger numFailures = new AtomicInteger();
for (int i = 0; i < numThreads; i++) {
Expand All @@ -109,7 +109,7 @@ public void testMaxRunningAsynchronousSearchContexts() throws Exception {
AsynchronousSearchResponse asResponse = executeSubmitAsynchronousSearch(validRequest);
} catch (Exception e) {
assertTrue(e instanceof ResponseException);
assertThat(e.getMessage(), containsString("Trying to create too many running contexts"));
assertThat(e.getMessage(), containsString("Trying to create too many concurrent searches"));
numFailures.getAndIncrement();

} finally {
Expand All @@ -128,7 +128,8 @@ public void testMaxRunningAsynchronousSearchContexts() throws Exception {
thread.join();
}
assertEquals(numFailures.get(), 50);
updateClusterSettings(AsynchronousSearchActiveStore.MAX_RUNNING_SEARCHES_SETTING.getKey(), DEFAULT_MAX_RUNNING_SEARCHES);
updateClusterSettings(AsynchronousSearchActiveStore.NODE_CONCURRENT_RUNNING_SEARCHES_SETTING.getKey(),
NODE_CONCURRENT_RUNNING_SEARCHES);
}

public void testStoreAsyncSearchWithFailures() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,11 +87,11 @@ public void createObjects() {
Settings settings = Settings.builder()
.put("node.name", "test")
.put("cluster.name", "ClusterServiceTests")
.put(AsynchronousSearchActiveStore.MAX_RUNNING_SEARCHES_SETTING.getKey(), 10)
.put(AsynchronousSearchActiveStore.NODE_CONCURRENT_RUNNING_SEARCHES_SETTING.getKey(), 10)
.build();
final Set<Setting<?>> settingsSet =
Stream.concat(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS.stream(), Stream.of(
AsynchronousSearchActiveStore.MAX_RUNNING_SEARCHES_SETTING,
AsynchronousSearchActiveStore.NODE_CONCURRENT_RUNNING_SEARCHES_SETTING,
AsynchronousSearchService.PERSIST_SEARCH_FAILURES_SETTING,
AsynchronousSearchService.MAX_KEEP_ALIVE_SETTING,
AsynchronousSearchService.MAX_SEARCH_RUNNING_TIME_SETTING,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,11 +101,11 @@ public void createObjects() {
Settings settings = Settings.builder()
.put("node.name", "test")
.put("cluster.name", "ClusterServiceTests")
.put(AsynchronousSearchActiveStore.MAX_RUNNING_SEARCHES_SETTING.getKey(), 10)
.put(AsynchronousSearchActiveStore.NODE_CONCURRENT_RUNNING_SEARCHES_SETTING.getKey(), 10)
.build();
final Set<Setting<?>> settingsSet =
Stream.concat(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS.stream(), Stream.of(
AsynchronousSearchActiveStore.MAX_RUNNING_SEARCHES_SETTING,
AsynchronousSearchActiveStore.NODE_CONCURRENT_RUNNING_SEARCHES_SETTING,
AsynchronousSearchService.MAX_KEEP_ALIVE_SETTING,
AsynchronousSearchService.PERSIST_SEARCH_FAILURES_SETTING,
AsynchronousSearchService.MAX_SEARCH_RUNNING_TIME_SETTING,
Expand Down
Loading

0 comments on commit e6322fb

Please sign in to comment.