Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Adding RestActions support for Detector Stats API #237

Merged
merged 3 commits into from
Sep 30, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,7 @@ List<String> jacocoExclusions = [
'com.amazon.opendistroforelasticsearch.ad.AnomalyDetectorRunner',
'com.amazon.opendistroforelasticsearch.ad.indices.AnomalyDetectionIndices',
'com.amazon.opendistroforelasticsearch.ad.util.ParseUtils',
'com.amazon.opendistroforelasticsearch.ad.transport.StatsAnomalyDetectorTransportAction'
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As we are already covering them in our IntegTests, its getting complicated to unit test it.

]

jacocoTestCoverageVerification {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,8 @@
import com.amazon.opendistroforelasticsearch.ad.transport.SearchAnomalyDetectorTransportAction;
import com.amazon.opendistroforelasticsearch.ad.transport.SearchAnomalyResultAction;
import com.amazon.opendistroforelasticsearch.ad.transport.SearchAnomalyResultTransportAction;
import com.amazon.opendistroforelasticsearch.ad.transport.StatsAnomalyDetectorAction;
import com.amazon.opendistroforelasticsearch.ad.transport.StatsAnomalyDetectorTransportAction;
import com.amazon.opendistroforelasticsearch.ad.transport.StopDetectorAction;
import com.amazon.opendistroforelasticsearch.ad.transport.StopDetectorTransportAction;
import com.amazon.opendistroforelasticsearch.ad.transport.ThresholdResultAction;
Expand Down Expand Up @@ -221,11 +223,7 @@ public List<RestHandler> getRestHandlers(
clusterService,
anomalyDetectorRunner
);
RestStatsAnomalyDetectorAction statsAnomalyDetectorAction = new RestStatsAnomalyDetectorAction(
adStats,
this.nodeFilter,
this.clusterService
);
RestStatsAnomalyDetectorAction statsAnomalyDetectorAction = new RestStatsAnomalyDetectorAction(adStats, this.nodeFilter);
RestAnomalyDetectorJobAction anomalyDetectorJobAction = new RestAnomalyDetectorJobAction(
settings,
clusterService,
Expand Down Expand Up @@ -466,7 +464,8 @@ public List<NamedXContentRegistry.Entry> getNamedXContent() {
new ActionHandler<>(ProfileAction.INSTANCE, ProfileTransportAction.class),
new ActionHandler<>(RCFPollingAction.INSTANCE, RCFPollingTransportAction.class),
new ActionHandler<>(SearchAnomalyDetectorAction.INSTANCE, SearchAnomalyDetectorTransportAction.class),
new ActionHandler<>(SearchAnomalyResultAction.INSTANCE, SearchAnomalyResultTransportAction.class)
new ActionHandler<>(SearchAnomalyResultAction.INSTANCE, SearchAnomalyResultTransportAction.class),
new ActionHandler<>(StatsAnomalyDetectorAction.INSTANCE, StatsAnomalyDetectorTransportAction.class)
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,36 +18,25 @@
import static com.amazon.opendistroforelasticsearch.ad.AnomalyDetectorPlugin.AD_BASE_URI;

import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Strings;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.BytesRestResponse;
import org.elasticsearch.rest.RestChannel;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.rest.action.RestToXContentListener;

import com.amazon.opendistroforelasticsearch.ad.constant.CommonErrorMessages;
import com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetector;
import com.amazon.opendistroforelasticsearch.ad.settings.EnabledSetting;
import com.amazon.opendistroforelasticsearch.ad.stats.ADStats;
import com.amazon.opendistroforelasticsearch.ad.stats.ADStatsResponse;
import com.amazon.opendistroforelasticsearch.ad.stats.StatNames;
import com.amazon.opendistroforelasticsearch.ad.transport.ADStatsNodesAction;
import com.amazon.opendistroforelasticsearch.ad.transport.ADStatsRequest;
import com.amazon.opendistroforelasticsearch.ad.transport.StatsAnomalyDetectorAction;
import com.amazon.opendistroforelasticsearch.ad.util.DiscoveryNodeFilterer;
import com.amazon.opendistroforelasticsearch.ad.util.MultiResponsesDelegateActionListener;
import com.google.common.collect.ImmutableList;

/**
Expand All @@ -65,12 +54,10 @@ public class RestStatsAnomalyDetectorAction extends BaseRestHandler {
*
* @param adStats ADStats object
* @param nodeFilter util class to get eligible data nodes
* @param clusterService ClusterService
*/
public RestStatsAnomalyDetectorAction(ADStats adStats, DiscoveryNodeFilterer nodeFilter, ClusterService clusterService) {
public RestStatsAnomalyDetectorAction(ADStats adStats, DiscoveryNodeFilterer nodeFilter) {
this.adStats = adStats;
this.nodeFilter = nodeFilter;
this.clusterService = clusterService;
}

@Override
Expand All @@ -84,7 +71,7 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli
throw new IllegalStateException(CommonErrorMessages.DISABLED_ERR_MSG);
}
ADStatsRequest adStatsRequest = getRequest(request);
return channel -> getStats(client, channel, adStatsRequest);
return channel -> client.execute(StatsAnomalyDetectorAction.INSTANCE, adStatsRequest, new RestToXContentListener<>(channel));
}

/**
Expand Down Expand Up @@ -141,116 +128,6 @@ private ADStatsRequest getRequest(RestRequest request) {
return adStatsRequest;
}

/**
* Make the 2 requests to get the node and cluster statistics
*
* @param client Client
* @param channel Channel to send response
* @param adStatsRequest Request containing stats to be retrieved
*/
private void getStats(Client client, RestChannel channel, ADStatsRequest adStatsRequest) {
// Use MultiResponsesDelegateActionListener to execute 2 async requests and create the response once they finish
MultiResponsesDelegateActionListener<ADStatsResponse> delegateListener = new MultiResponsesDelegateActionListener<>(
getRestStatsListener(channel),
2,
"Unable to return AD Stats"
);

getClusterStats(client, delegateListener, adStatsRequest);
getNodeStats(client, delegateListener, adStatsRequest);
}

/**
* Make async request to get the number of detectors in AnomalyDetector.ANOMALY_DETECTORS_INDEX if necessary
* and, onResponse, gather the cluster statistics
*
* @param client Client
* @param listener MultiResponsesDelegateActionListener to be used once both requests complete
* @param adStatsRequest Request containing stats to be retrieved
*/
private void getClusterStats(
Client client,
MultiResponsesDelegateActionListener<ADStatsResponse> listener,
ADStatsRequest adStatsRequest
) {
ADStatsResponse adStatsResponse = new ADStatsResponse();
if (adStatsRequest.getStatsToBeRetrieved().contains(StatNames.DETECTOR_COUNT.getName())) {
if (clusterService.state().getRoutingTable().hasIndex(AnomalyDetector.ANOMALY_DETECTORS_INDEX)) {
final SearchRequest request = client
.prepareSearch(AnomalyDetector.ANOMALY_DETECTORS_INDEX)
.setSize(0)
.setTrackTotalHits(true)
.request();
client.search(request, ActionListener.wrap(indicesStatsResponse -> {
adStats.getStat(StatNames.DETECTOR_COUNT.getName()).setValue(indicesStatsResponse.getHits().getTotalHits().value);
adStatsResponse.setClusterStats(getClusterStatsMap(adStatsRequest));
listener.onResponse(adStatsResponse);
}, e -> listener.onFailure(new RuntimeException("Failed to get AD cluster stats", e))));
} else {
adStats.getStat(StatNames.DETECTOR_COUNT.getName()).setValue(0L);
adStatsResponse.setClusterStats(getClusterStatsMap(adStatsRequest));
listener.onResponse(adStatsResponse);
}
} else {
adStatsResponse.setClusterStats(getClusterStatsMap(adStatsRequest));
listener.onResponse(adStatsResponse);
}
}

/**
* Make async request to get the Anomaly Detection statistics from each node and, onResponse, set the
* ADStatsNodesResponse field of ADStatsResponse
*
* @param client Client
* @param listener MultiResponsesDelegateActionListener to be used once both requests complete
* @param adStatsRequest Request containing stats to be retrieved
*/
private void getNodeStats(
Client client,
MultiResponsesDelegateActionListener<ADStatsResponse> listener,
ADStatsRequest adStatsRequest
) {
client.execute(ADStatsNodesAction.INSTANCE, adStatsRequest, ActionListener.wrap(adStatsResponse -> {
ADStatsResponse restADStatsResponse = new ADStatsResponse();
restADStatsResponse.setADStatsNodesResponse(adStatsResponse);
listener.onResponse(restADStatsResponse);
}, listener::onFailure));
}

/**
* Collect Cluster Stats into map to be retrieved
*
* @param adStatsRequest Request containing stats to be retrieved
* @return Map containing Cluster Stats
*/
private Map<String, Object> getClusterStatsMap(ADStatsRequest adStatsRequest) {
Map<String, Object> clusterStats = new HashMap<>();
Set<String> statsToBeRetrieved = adStatsRequest.getStatsToBeRetrieved();
adStats
.getClusterStats()
.entrySet()
.stream()
.filter(s -> statsToBeRetrieved.contains(s.getKey()))
.forEach(s -> clusterStats.put(s.getKey(), s.getValue().getValue()));
return clusterStats;
}

/**
* Listener sends response once Node Stats and Cluster Stats are gathered
*
* @param channel Channel
* @return ActionListener for ADStatsResponse
*/
private ActionListener<ADStatsResponse> getRestStatsListener(RestChannel channel) {
return ActionListener
.wrap(
adStatsResponse -> {
channel.sendResponse(new BytesRestResponse(RestStatus.OK, adStatsResponse.toXContent(channel.newBuilder())));
},
exception -> channel.sendResponse(new BytesRestResponse(RestStatus.INTERNAL_SERVER_ERROR, exception.getMessage()))
);
}

@Override
public List<Route> routes() {
return ImmutableList
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import org.apache.commons.lang.builder.EqualsBuilder;
import org.apache.commons.lang.builder.HashCodeBuilder;
import org.apache.commons.lang.builder.ToStringBuilder;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
Expand Down Expand Up @@ -82,6 +84,18 @@ public XContentBuilder toXContent(XContentBuilder builder) throws IOException {
return toXContent(builder, ToXContent.EMPTY_PARAMS);
}

public ADStatsResponse() {}

public ADStatsResponse(StreamInput in) throws IOException {
adStatsNodesResponse = new ADStatsNodesResponse(in);
clusterStats = in.readMap();
}

public void writeTo(StreamOutput out) throws IOException {
adStatsNodesResponse.writeTo(out);
out.writeMap(clusterStats);
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
XContentBuilder xContentBuilder = builder.startObject();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package com.amazon.opendistroforelasticsearch.ad.transport;

import org.elasticsearch.action.ActionType;

public class StatsAnomalyDetectorAction extends ActionType<StatsAnomalyDetectorResponse> {
public static final StatsAnomalyDetectorAction INSTANCE = new StatsAnomalyDetectorAction();
public static final String NAME = "cluster:admin/opendistro/ad/detector/stats";

private StatsAnomalyDetectorAction() {
super(NAME, StatsAnomalyDetectorResponse::new);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package com.amazon.opendistroforelasticsearch.ad.transport;

import java.io.IOException;

import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;

import com.amazon.opendistroforelasticsearch.ad.stats.ADStatsResponse;

public class StatsAnomalyDetectorResponse extends ActionResponse implements ToXContentObject {
private ADStatsResponse adStatsResponse;

public StatsAnomalyDetectorResponse(StreamInput in) throws IOException {
super(in);
adStatsResponse = new ADStatsResponse(in);
}

public StatsAnomalyDetectorResponse(ADStatsResponse adStatsResponse) {
this.adStatsResponse = adStatsResponse;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
adStatsResponse.writeTo(out);
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
adStatsResponse.toXContent(builder, params);
return builder;
}
}
Loading