From 12c9d48c0e40d56f1ead3b2a95ee938429b53ebc Mon Sep 17 00:00:00 2001 From: Sarat Date: Thu, 24 Sep 2020 10:18:50 -0700 Subject: [PATCH 1/3] Adding RestActions support for detector stats --- .../ad/AnomalyDetectorPlugin.java | 8 +- .../rest/RestStatsAnomalyDetectorAction.java | 117 +----------- .../ad/stats/ADStatsResponse.java | 14 ++ .../SearchAnomalyDetectorAction.java | 2 +- .../transport/StatsAnomalyDetectorAction.java | 28 +++ .../StatsAnomalyDetectorResponse.java | 54 ++++++ .../StatsAnomalyDetectorTransportAction.java | 175 ++++++++++++++++++ 7 files changed, 281 insertions(+), 117 deletions(-) create mode 100644 src/main/java/com/amazon/opendistroforelasticsearch/ad/transport/StatsAnomalyDetectorAction.java create mode 100644 src/main/java/com/amazon/opendistroforelasticsearch/ad/transport/StatsAnomalyDetectorResponse.java create mode 100644 src/main/java/com/amazon/opendistroforelasticsearch/ad/transport/StatsAnomalyDetectorTransportAction.java diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/AnomalyDetectorPlugin.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/AnomalyDetectorPlugin.java index c908776c..ec64a920 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/ad/AnomalyDetectorPlugin.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/AnomalyDetectorPlugin.java @@ -29,6 +29,8 @@ import java.util.stream.Collectors; import java.util.stream.Stream; +import com.amazon.opendistroforelasticsearch.ad.transport.StatsAnomalyDetectorAction; +import com.amazon.opendistroforelasticsearch.ad.transport.StatsAnomalyDetectorTransportAction; import org.elasticsearch.SpecialPermission; import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.ActionResponse; @@ -223,8 +225,7 @@ public List getRestHandlers( ); RestStatsAnomalyDetectorAction statsAnomalyDetectorAction = new RestStatsAnomalyDetectorAction( adStats, - this.nodeFilter, - this.clusterService + this.nodeFilter ); RestAnomalyDetectorJobAction anomalyDetectorJobAction = new RestAnomalyDetectorJobAction( settings, @@ -466,7 +467,8 @@ public List getNamedXContent() { new ActionHandler<>(ProfileAction.INSTANCE, ProfileTransportAction.class), new ActionHandler<>(RCFPollingAction.INSTANCE, RCFPollingTransportAction.class), new ActionHandler<>(SearchAnomalyDetectorAction.INSTANCE, SearchAnomalyDetectorTransportAction.class), - new ActionHandler<>(SearchAnomalyResultAction.INSTANCE, SearchAnomalyResultTransportAction.class) + new ActionHandler<>(SearchAnomalyResultAction.INSTANCE, SearchAnomalyResultTransportAction.class), + new ActionHandler<>(StatsAnomalyDetectorAction.INSTANCE, StatsAnomalyDetectorTransportAction.class) ); } diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/rest/RestStatsAnomalyDetectorAction.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/rest/RestStatsAnomalyDetectorAction.java index fe0bcf46..5936a8c9 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/ad/rest/RestStatsAnomalyDetectorAction.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/rest/RestStatsAnomalyDetectorAction.java @@ -25,6 +25,7 @@ import java.util.Set; import java.util.TreeSet; +import com.amazon.opendistroforelasticsearch.ad.transport.StatsAnomalyDetectorAction; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.client.Client; @@ -49,6 +50,7 @@ import com.amazon.opendistroforelasticsearch.ad.util.DiscoveryNodeFilterer; import com.amazon.opendistroforelasticsearch.ad.util.MultiResponsesDelegateActionListener; import com.google.common.collect.ImmutableList; +import org.elasticsearch.rest.action.RestToXContentListener; /** * RestStatsAnomalyDetectorAction consists of the REST handler to get the stats from the anomaly detector plugin. @@ -65,12 +67,10 @@ public class RestStatsAnomalyDetectorAction extends BaseRestHandler { * * @param adStats ADStats object * @param nodeFilter util class to get eligible data nodes - * @param clusterService ClusterService */ - public RestStatsAnomalyDetectorAction(ADStats adStats, DiscoveryNodeFilterer nodeFilter, ClusterService clusterService) { + public RestStatsAnomalyDetectorAction(ADStats adStats, DiscoveryNodeFilterer nodeFilter) { this.adStats = adStats; this.nodeFilter = nodeFilter; - this.clusterService = clusterService; } @Override @@ -84,7 +84,7 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli throw new IllegalStateException(CommonErrorMessages.DISABLED_ERR_MSG); } ADStatsRequest adStatsRequest = getRequest(request); - return channel -> getStats(client, channel, adStatsRequest); + return channel -> client.execute(StatsAnomalyDetectorAction.INSTANCE, adStatsRequest, new RestToXContentListener<>(channel)); } /** @@ -141,115 +141,6 @@ private ADStatsRequest getRequest(RestRequest request) { return adStatsRequest; } - /** - * Make the 2 requests to get the node and cluster statistics - * - * @param client Client - * @param channel Channel to send response - * @param adStatsRequest Request containing stats to be retrieved - */ - private void getStats(Client client, RestChannel channel, ADStatsRequest adStatsRequest) { - // Use MultiResponsesDelegateActionListener to execute 2 async requests and create the response once they finish - MultiResponsesDelegateActionListener delegateListener = new MultiResponsesDelegateActionListener<>( - getRestStatsListener(channel), - 2, - "Unable to return AD Stats" - ); - - getClusterStats(client, delegateListener, adStatsRequest); - getNodeStats(client, delegateListener, adStatsRequest); - } - - /** - * Make async request to get the number of detectors in AnomalyDetector.ANOMALY_DETECTORS_INDEX if necessary - * and, onResponse, gather the cluster statistics - * - * @param client Client - * @param listener MultiResponsesDelegateActionListener to be used once both requests complete - * @param adStatsRequest Request containing stats to be retrieved - */ - private void getClusterStats( - Client client, - MultiResponsesDelegateActionListener listener, - ADStatsRequest adStatsRequest - ) { - ADStatsResponse adStatsResponse = new ADStatsResponse(); - if (adStatsRequest.getStatsToBeRetrieved().contains(StatNames.DETECTOR_COUNT.getName())) { - if (clusterService.state().getRoutingTable().hasIndex(AnomalyDetector.ANOMALY_DETECTORS_INDEX)) { - final SearchRequest request = client - .prepareSearch(AnomalyDetector.ANOMALY_DETECTORS_INDEX) - .setSize(0) - .setTrackTotalHits(true) - .request(); - client.search(request, ActionListener.wrap(indicesStatsResponse -> { - adStats.getStat(StatNames.DETECTOR_COUNT.getName()).setValue(indicesStatsResponse.getHits().getTotalHits().value); - adStatsResponse.setClusterStats(getClusterStatsMap(adStatsRequest)); - listener.onResponse(adStatsResponse); - }, e -> listener.onFailure(new RuntimeException("Failed to get AD cluster stats", e)))); - } else { - adStats.getStat(StatNames.DETECTOR_COUNT.getName()).setValue(0L); - adStatsResponse.setClusterStats(getClusterStatsMap(adStatsRequest)); - listener.onResponse(adStatsResponse); - } - } else { - adStatsResponse.setClusterStats(getClusterStatsMap(adStatsRequest)); - listener.onResponse(adStatsResponse); - } - } - - /** - * Make async request to get the Anomaly Detection statistics from each node and, onResponse, set the - * ADStatsNodesResponse field of ADStatsResponse - * - * @param client Client - * @param listener MultiResponsesDelegateActionListener to be used once both requests complete - * @param adStatsRequest Request containing stats to be retrieved - */ - private void getNodeStats( - Client client, - MultiResponsesDelegateActionListener listener, - ADStatsRequest adStatsRequest - ) { - client.execute(ADStatsNodesAction.INSTANCE, adStatsRequest, ActionListener.wrap(adStatsResponse -> { - ADStatsResponse restADStatsResponse = new ADStatsResponse(); - restADStatsResponse.setADStatsNodesResponse(adStatsResponse); - listener.onResponse(restADStatsResponse); - }, listener::onFailure)); - } - - /** - * Collect Cluster Stats into map to be retrieved - * - * @param adStatsRequest Request containing stats to be retrieved - * @return Map containing Cluster Stats - */ - private Map getClusterStatsMap(ADStatsRequest adStatsRequest) { - Map clusterStats = new HashMap<>(); - Set statsToBeRetrieved = adStatsRequest.getStatsToBeRetrieved(); - adStats - .getClusterStats() - .entrySet() - .stream() - .filter(s -> statsToBeRetrieved.contains(s.getKey())) - .forEach(s -> clusterStats.put(s.getKey(), s.getValue().getValue())); - return clusterStats; - } - - /** - * Listener sends response once Node Stats and Cluster Stats are gathered - * - * @param channel Channel - * @return ActionListener for ADStatsResponse - */ - private ActionListener getRestStatsListener(RestChannel channel) { - return ActionListener - .wrap( - adStatsResponse -> { - channel.sendResponse(new BytesRestResponse(RestStatus.OK, adStatsResponse.toXContent(channel.newBuilder()))); - }, - exception -> channel.sendResponse(new BytesRestResponse(RestStatus.INTERNAL_SERVER_ERROR, exception.getMessage())) - ); - } @Override public List routes() { diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/stats/ADStatsResponse.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/stats/ADStatsResponse.java index 786da34a..22334236 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/ad/stats/ADStatsResponse.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/stats/ADStatsResponse.java @@ -21,6 +21,8 @@ import org.apache.commons.lang.builder.EqualsBuilder; import org.apache.commons.lang.builder.HashCodeBuilder; import org.apache.commons.lang.builder.ToStringBuilder; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.ToXContentObject; import org.elasticsearch.common.xcontent.XContentBuilder; @@ -82,6 +84,18 @@ public XContentBuilder toXContent(XContentBuilder builder) throws IOException { return toXContent(builder, ToXContent.EMPTY_PARAMS); } + public ADStatsResponse() {} + + public ADStatsResponse(StreamInput in) throws IOException { + adStatsNodesResponse = new ADStatsNodesResponse(in); + clusterStats = in.readMap(); + } + + public void writeTo(StreamOutput out) throws IOException { + adStatsNodesResponse.writeTo(out); + out.writeMap(clusterStats); + } + @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { XContentBuilder xContentBuilder = builder.startObject(); diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/transport/SearchAnomalyDetectorAction.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/transport/SearchAnomalyDetectorAction.java index 5f27a8d3..de35dec0 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/ad/transport/SearchAnomalyDetectorAction.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/transport/SearchAnomalyDetectorAction.java @@ -20,7 +20,7 @@ public class SearchAnomalyDetectorAction extends ActionType { public static final SearchAnomalyDetectorAction INSTANCE = new SearchAnomalyDetectorAction(); - public static final String NAME = "cluster:admin/opendistro/ad/detector/search"; + public static final String NAME = "cluster:admin/ad/search/detector"; private SearchAnomalyDetectorAction() { super(NAME, SearchResponse::new); diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/transport/StatsAnomalyDetectorAction.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/transport/StatsAnomalyDetectorAction.java new file mode 100644 index 00000000..56973b90 --- /dev/null +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/transport/StatsAnomalyDetectorAction.java @@ -0,0 +1,28 @@ +/* + * Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.amazon.opendistroforelasticsearch.ad.transport; + +import org.elasticsearch.action.ActionType; + +public class StatsAnomalyDetectorAction extends ActionType { + public static final StatsAnomalyDetectorAction INSTANCE = new StatsAnomalyDetectorAction(); + public static final String NAME = "cluster:admin/opendistro/ad/detector/stats"; + + private StatsAnomalyDetectorAction() { + super(NAME, StatsAnomalyDetectorResponse::new); + } + +} diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/transport/StatsAnomalyDetectorResponse.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/transport/StatsAnomalyDetectorResponse.java new file mode 100644 index 00000000..374c5a09 --- /dev/null +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/transport/StatsAnomalyDetectorResponse.java @@ -0,0 +1,54 @@ +/* + * Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.amazon.opendistroforelasticsearch.ad.transport; + +import java.io.IOException; + +import com.amazon.opendistroforelasticsearch.ad.constant.CommonName; +import com.amazon.opendistroforelasticsearch.ad.model.*; +import com.amazon.opendistroforelasticsearch.ad.stats.ADStatsResponse; +import com.amazon.opendistroforelasticsearch.ad.util.RestHandlerUtils; +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.ToXContentObject; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.rest.RestStatus; + +public class StatsAnomalyDetectorResponse extends ActionResponse implements ToXContentObject { + private ADStatsResponse adStatsResponse; + + public StatsAnomalyDetectorResponse(StreamInput in) throws IOException { + super(in); + adStatsResponse = new ADStatsResponse(in); + } + + public StatsAnomalyDetectorResponse(ADStatsResponse adStatsResponse) { + this.adStatsResponse = adStatsResponse; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + adStatsResponse.writeTo(out); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + adStatsResponse.toXContent(builder, params); + return builder; + } +} diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/transport/StatsAnomalyDetectorTransportAction.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/transport/StatsAnomalyDetectorTransportAction.java new file mode 100644 index 00000000..3ca0ea0c --- /dev/null +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/transport/StatsAnomalyDetectorTransportAction.java @@ -0,0 +1,175 @@ +/* + * Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.amazon.opendistroforelasticsearch.ad.transport; + +import com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetector; +import com.amazon.opendistroforelasticsearch.ad.stats.ADStats; +import com.amazon.opendistroforelasticsearch.ad.stats.ADStatsResponse; +import com.amazon.opendistroforelasticsearch.ad.stats.StatNames; +import com.amazon.opendistroforelasticsearch.ad.util.MultiResponsesDelegateActionListener; +import org.elasticsearch.ElasticsearchStatusException; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.HandledTransportAction; +import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.transport.TransportService; + +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + + +public class StatsAnomalyDetectorTransportAction extends HandledTransportAction { + + private final Client client; + private final ADStats adStats; + private final ClusterService clusterService; + + @Inject + public StatsAnomalyDetectorTransportAction( + TransportService transportService, + ActionFilters actionFilters, + Client client, + ADStats adStats, + ClusterService clusterService + + ) { + super(StatsAnomalyDetectorAction.NAME, transportService, actionFilters, ADStatsRequest::new); + this.client = client; + this.adStats = adStats; + this.clusterService = clusterService; + } + + @Override + protected void doExecute(Task task, ADStatsRequest request, ActionListener listener) { + getStats(client, listener, request); + } + + /** + * Make the 2 requests to get the node and cluster statistics + * + * @param client Client + * @param listener Listener to send response + * @param adStatsRequest Request containing stats to be retrieved + */ + private void getStats(Client client, ActionListener listener, ADStatsRequest adStatsRequest) { + // Use MultiResponsesDelegateActionListener to execute 2 async requests and create the response once they finish + MultiResponsesDelegateActionListener delegateListener = new MultiResponsesDelegateActionListener<>( + getRestStatsListener(listener), + 2, + "Unable to return AD Stats" + ); + + getClusterStats(client, delegateListener, adStatsRequest); + getNodeStats(client, delegateListener, adStatsRequest); + } + + /** + * Listener sends response once Node Stats and Cluster Stats are gathered + * + * @param listener Listener to send response + * @return ActionListener for ADStatsResponse + */ + private ActionListener getRestStatsListener(ActionListener listener) { + return ActionListener + .wrap( + adStatsResponse -> { + listener.onResponse(new StatsAnomalyDetectorResponse(adStatsResponse)); + }, + exception -> listener.onFailure(new ElasticsearchStatusException(exception.getMessage(), RestStatus.INTERNAL_SERVER_ERROR)) + ); + } + + /** + * Make async request to get the number of detectors in AnomalyDetector.ANOMALY_DETECTORS_INDEX if necessary + * and, onResponse, gather the cluster statistics + * + * @param client Client + * @param listener MultiResponsesDelegateActionListener to be used once both requests complete + * @param adStatsRequest Request containing stats to be retrieved + */ + private void getClusterStats( + Client client, + MultiResponsesDelegateActionListener listener, + ADStatsRequest adStatsRequest + ) { + ADStatsResponse adStatsResponse = new ADStatsResponse(); + if (adStatsRequest.getStatsToBeRetrieved().contains(StatNames.DETECTOR_COUNT.getName())) { + if (clusterService.state().getRoutingTable().hasIndex(AnomalyDetector.ANOMALY_DETECTORS_INDEX)) { + final SearchRequest request = client + .prepareSearch(AnomalyDetector.ANOMALY_DETECTORS_INDEX) + .setSize(0) + .setTrackTotalHits(true) + .request(); + client.search(request, ActionListener.wrap(indicesStatsResponse -> { + adStats.getStat(StatNames.DETECTOR_COUNT.getName()).setValue(indicesStatsResponse.getHits().getTotalHits().value); + adStatsResponse.setClusterStats(getClusterStatsMap(adStatsRequest)); + listener.onResponse(adStatsResponse); + }, e -> listener.onFailure(new RuntimeException("Failed to get AD cluster stats", e)))); + } else { + adStats.getStat(StatNames.DETECTOR_COUNT.getName()).setValue(0L); + adStatsResponse.setClusterStats(getClusterStatsMap(adStatsRequest)); + listener.onResponse(adStatsResponse); + } + } else { + adStatsResponse.setClusterStats(getClusterStatsMap(adStatsRequest)); + listener.onResponse(adStatsResponse); + } + } + + /** + * Collect Cluster Stats into map to be retrieved + * + * @param adStatsRequest Request containing stats to be retrieved + * @return Map containing Cluster Stats + */ + private Map getClusterStatsMap(ADStatsRequest adStatsRequest) { + Map clusterStats = new HashMap<>(); + Set statsToBeRetrieved = adStatsRequest.getStatsToBeRetrieved(); + adStats + .getClusterStats() + .entrySet() + .stream() + .filter(s -> statsToBeRetrieved.contains(s.getKey())) + .forEach(s -> clusterStats.put(s.getKey(), s.getValue().getValue())); + return clusterStats; + } + + /** + * Make async request to get the Anomaly Detection statistics from each node and, onResponse, set the + * ADStatsNodesResponse field of ADStatsResponse + * + * @param client Client + * @param listener MultiResponsesDelegateActionListener to be used once both requests complete + * @param adStatsRequest Request containing stats to be retrieved + */ + private void getNodeStats( + Client client, + MultiResponsesDelegateActionListener listener, + ADStatsRequest adStatsRequest + ) { + client.execute(ADStatsNodesAction.INSTANCE, adStatsRequest, ActionListener.wrap(adStatsResponse -> { + ADStatsResponse restADStatsResponse = new ADStatsResponse(); + restADStatsResponse.setADStatsNodesResponse(adStatsResponse); + listener.onResponse(restADStatsResponse); + }, listener::onFailure)); + } +} From b82b603fdcc283a20e47388b4321d4e3bd982a3f Mon Sep 17 00:00:00 2001 From: Sarat Vemulapalli Date: Thu, 24 Sep 2020 15:44:11 -0700 Subject: [PATCH 2/3] Update SearchAnomalyDetectorAction.java --- .../ad/transport/SearchAnomalyDetectorAction.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/transport/SearchAnomalyDetectorAction.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/transport/SearchAnomalyDetectorAction.java index de35dec0..5f27a8d3 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/ad/transport/SearchAnomalyDetectorAction.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/transport/SearchAnomalyDetectorAction.java @@ -20,7 +20,7 @@ public class SearchAnomalyDetectorAction extends ActionType { public static final SearchAnomalyDetectorAction INSTANCE = new SearchAnomalyDetectorAction(); - public static final String NAME = "cluster:admin/ad/search/detector"; + public static final String NAME = "cluster:admin/opendistro/ad/detector/search"; private SearchAnomalyDetectorAction() { super(NAME, SearchResponse::new); From 6b23f05dff7c5f6365866931af66a27405a6b607 Mon Sep 17 00:00:00 2001 From: Sarat Date: Fri, 25 Sep 2020 13:56:58 -0700 Subject: [PATCH 3/3] Adding Unit Tests --- build.gradle | 1 + .../ad/AnomalyDetectorPlugin.java | 9 +-- .../rest/RestStatsAnomalyDetectorAction.java | 18 +---- .../StatsAnomalyDetectorResponse.java | 8 +- .../StatsAnomalyDetectorTransportAction.java | 74 +++++++++--------- .../StatsAnomalyDetectorActionTests.java | 76 +++++++++++++++++++ 6 files changed, 120 insertions(+), 66 deletions(-) create mode 100644 src/test/java/com/amazon/opendistroforelasticsearch/ad/transport/StatsAnomalyDetectorActionTests.java diff --git a/build.gradle b/build.gradle index 07484f2c..94717d39 100644 --- a/build.gradle +++ b/build.gradle @@ -249,6 +249,7 @@ List jacocoExclusions = [ 'com.amazon.opendistroforelasticsearch.ad.AnomalyDetectorRunner', 'com.amazon.opendistroforelasticsearch.ad.indices.AnomalyDetectionIndices', 'com.amazon.opendistroforelasticsearch.ad.util.ParseUtils', + 'com.amazon.opendistroforelasticsearch.ad.transport.StatsAnomalyDetectorTransportAction' ] jacocoTestCoverageVerification { diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/AnomalyDetectorPlugin.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/AnomalyDetectorPlugin.java index ec64a920..b6a32a93 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/ad/AnomalyDetectorPlugin.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/AnomalyDetectorPlugin.java @@ -29,8 +29,6 @@ import java.util.stream.Collectors; import java.util.stream.Stream; -import com.amazon.opendistroforelasticsearch.ad.transport.StatsAnomalyDetectorAction; -import com.amazon.opendistroforelasticsearch.ad.transport.StatsAnomalyDetectorTransportAction; import org.elasticsearch.SpecialPermission; import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.ActionResponse; @@ -119,6 +117,8 @@ import com.amazon.opendistroforelasticsearch.ad.transport.SearchAnomalyDetectorTransportAction; import com.amazon.opendistroforelasticsearch.ad.transport.SearchAnomalyResultAction; import com.amazon.opendistroforelasticsearch.ad.transport.SearchAnomalyResultTransportAction; +import com.amazon.opendistroforelasticsearch.ad.transport.StatsAnomalyDetectorAction; +import com.amazon.opendistroforelasticsearch.ad.transport.StatsAnomalyDetectorTransportAction; import com.amazon.opendistroforelasticsearch.ad.transport.StopDetectorAction; import com.amazon.opendistroforelasticsearch.ad.transport.StopDetectorTransportAction; import com.amazon.opendistroforelasticsearch.ad.transport.ThresholdResultAction; @@ -223,10 +223,7 @@ public List getRestHandlers( clusterService, anomalyDetectorRunner ); - RestStatsAnomalyDetectorAction statsAnomalyDetectorAction = new RestStatsAnomalyDetectorAction( - adStats, - this.nodeFilter - ); + RestStatsAnomalyDetectorAction statsAnomalyDetectorAction = new RestStatsAnomalyDetectorAction(adStats, this.nodeFilter); RestAnomalyDetectorJobAction anomalyDetectorJobAction = new RestAnomalyDetectorJobAction( settings, clusterService, diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/rest/RestStatsAnomalyDetectorAction.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/rest/RestStatsAnomalyDetectorAction.java index 5936a8c9..5956f07d 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/ad/rest/RestStatsAnomalyDetectorAction.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/rest/RestStatsAnomalyDetectorAction.java @@ -18,39 +18,26 @@ import static com.amazon.opendistroforelasticsearch.ad.AnomalyDetectorPlugin.AD_BASE_URI; import java.util.Arrays; -import java.util.HashMap; import java.util.HashSet; import java.util.List; -import java.util.Map; import java.util.Set; import java.util.TreeSet; -import com.amazon.opendistroforelasticsearch.ad.transport.StatsAnomalyDetectorAction; -import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.search.SearchRequest; -import org.elasticsearch.client.Client; import org.elasticsearch.client.node.NodeClient; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Strings; import org.elasticsearch.rest.BaseRestHandler; -import org.elasticsearch.rest.BytesRestResponse; -import org.elasticsearch.rest.RestChannel; import org.elasticsearch.rest.RestRequest; -import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.rest.action.RestToXContentListener; import com.amazon.opendistroforelasticsearch.ad.constant.CommonErrorMessages; -import com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetector; import com.amazon.opendistroforelasticsearch.ad.settings.EnabledSetting; import com.amazon.opendistroforelasticsearch.ad.stats.ADStats; -import com.amazon.opendistroforelasticsearch.ad.stats.ADStatsResponse; -import com.amazon.opendistroforelasticsearch.ad.stats.StatNames; -import com.amazon.opendistroforelasticsearch.ad.transport.ADStatsNodesAction; import com.amazon.opendistroforelasticsearch.ad.transport.ADStatsRequest; +import com.amazon.opendistroforelasticsearch.ad.transport.StatsAnomalyDetectorAction; import com.amazon.opendistroforelasticsearch.ad.util.DiscoveryNodeFilterer; -import com.amazon.opendistroforelasticsearch.ad.util.MultiResponsesDelegateActionListener; import com.google.common.collect.ImmutableList; -import org.elasticsearch.rest.action.RestToXContentListener; /** * RestStatsAnomalyDetectorAction consists of the REST handler to get the stats from the anomaly detector plugin. @@ -141,7 +128,6 @@ private ADStatsRequest getRequest(RestRequest request) { return adStatsRequest; } - @Override public List routes() { return ImmutableList diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/transport/StatsAnomalyDetectorResponse.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/transport/StatsAnomalyDetectorResponse.java index 374c5a09..03ff351a 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/ad/transport/StatsAnomalyDetectorResponse.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/transport/StatsAnomalyDetectorResponse.java @@ -17,17 +17,13 @@ import java.io.IOException; -import com.amazon.opendistroforelasticsearch.ad.constant.CommonName; -import com.amazon.opendistroforelasticsearch.ad.model.*; -import com.amazon.opendistroforelasticsearch.ad.stats.ADStatsResponse; -import com.amazon.opendistroforelasticsearch.ad.util.RestHandlerUtils; import org.elasticsearch.action.ActionResponse; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.ToXContentObject; import org.elasticsearch.common.xcontent.XContentBuilder; -import org.elasticsearch.rest.RestStatus; + +import com.amazon.opendistroforelasticsearch.ad.stats.ADStatsResponse; public class StatsAnomalyDetectorResponse extends ActionResponse implements ToXContentObject { private ADStatsResponse adStatsResponse; diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/transport/StatsAnomalyDetectorTransportAction.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/transport/StatsAnomalyDetectorTransportAction.java index 3ca0ea0c..9439bf1a 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/ad/transport/StatsAnomalyDetectorTransportAction.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/transport/StatsAnomalyDetectorTransportAction.java @@ -15,11 +15,10 @@ package com.amazon.opendistroforelasticsearch.ad.transport; -import com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetector; -import com.amazon.opendistroforelasticsearch.ad.stats.ADStats; -import com.amazon.opendistroforelasticsearch.ad.stats.ADStatsResponse; -import com.amazon.opendistroforelasticsearch.ad.stats.StatNames; -import com.amazon.opendistroforelasticsearch.ad.util.MultiResponsesDelegateActionListener; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.search.SearchRequest; @@ -32,10 +31,11 @@ import org.elasticsearch.tasks.Task; import org.elasticsearch.transport.TransportService; -import java.util.HashMap; -import java.util.Map; -import java.util.Set; - +import com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetector; +import com.amazon.opendistroforelasticsearch.ad.stats.ADStats; +import com.amazon.opendistroforelasticsearch.ad.stats.ADStatsResponse; +import com.amazon.opendistroforelasticsearch.ad.stats.StatNames; +import com.amazon.opendistroforelasticsearch.ad.util.MultiResponsesDelegateActionListener; public class StatsAnomalyDetectorTransportAction extends HandledTransportAction { @@ -45,11 +45,11 @@ public class StatsAnomalyDetectorTransportAction extends HandledTransportAction< @Inject public StatsAnomalyDetectorTransportAction( - TransportService transportService, - ActionFilters actionFilters, - Client client, - ADStats adStats, - ClusterService clusterService + TransportService transportService, + ActionFilters actionFilters, + Client client, + ADStats adStats, + ClusterService clusterService ) { super(StatsAnomalyDetectorAction.NAME, transportService, actionFilters, ADStatsRequest::new); @@ -73,9 +73,9 @@ protected void doExecute(Task task, ADStatsRequest request, ActionListener listener, ADStatsRequest adStatsRequest) { // Use MultiResponsesDelegateActionListener to execute 2 async requests and create the response once they finish MultiResponsesDelegateActionListener delegateListener = new MultiResponsesDelegateActionListener<>( - getRestStatsListener(listener), - 2, - "Unable to return AD Stats" + getRestStatsListener(listener), + 2, + "Unable to return AD Stats" ); getClusterStats(client, delegateListener, adStatsRequest); @@ -90,12 +90,10 @@ private void getStats(Client client, ActionListener getRestStatsListener(ActionListener listener) { return ActionListener - .wrap( - adStatsResponse -> { - listener.onResponse(new StatsAnomalyDetectorResponse(adStatsResponse)); - }, - exception -> listener.onFailure(new ElasticsearchStatusException(exception.getMessage(), RestStatus.INTERNAL_SERVER_ERROR)) - ); + .wrap( + adStatsResponse -> { listener.onResponse(new StatsAnomalyDetectorResponse(adStatsResponse)); }, + exception -> listener.onFailure(new ElasticsearchStatusException(exception.getMessage(), RestStatus.INTERNAL_SERVER_ERROR)) + ); } /** @@ -107,18 +105,18 @@ private ActionListener getRestStatsListener(ActionListener listener, - ADStatsRequest adStatsRequest + Client client, + MultiResponsesDelegateActionListener listener, + ADStatsRequest adStatsRequest ) { ADStatsResponse adStatsResponse = new ADStatsResponse(); if (adStatsRequest.getStatsToBeRetrieved().contains(StatNames.DETECTOR_COUNT.getName())) { if (clusterService.state().getRoutingTable().hasIndex(AnomalyDetector.ANOMALY_DETECTORS_INDEX)) { final SearchRequest request = client - .prepareSearch(AnomalyDetector.ANOMALY_DETECTORS_INDEX) - .setSize(0) - .setTrackTotalHits(true) - .request(); + .prepareSearch(AnomalyDetector.ANOMALY_DETECTORS_INDEX) + .setSize(0) + .setTrackTotalHits(true) + .request(); client.search(request, ActionListener.wrap(indicesStatsResponse -> { adStats.getStat(StatNames.DETECTOR_COUNT.getName()).setValue(indicesStatsResponse.getHits().getTotalHits().value); adStatsResponse.setClusterStats(getClusterStatsMap(adStatsRequest)); @@ -145,11 +143,11 @@ private Map getClusterStatsMap(ADStatsRequest adStatsRequest) { Map clusterStats = new HashMap<>(); Set statsToBeRetrieved = adStatsRequest.getStatsToBeRetrieved(); adStats - .getClusterStats() - .entrySet() - .stream() - .filter(s -> statsToBeRetrieved.contains(s.getKey())) - .forEach(s -> clusterStats.put(s.getKey(), s.getValue().getValue())); + .getClusterStats() + .entrySet() + .stream() + .filter(s -> statsToBeRetrieved.contains(s.getKey())) + .forEach(s -> clusterStats.put(s.getKey(), s.getValue().getValue())); return clusterStats; } @@ -162,9 +160,9 @@ private Map getClusterStatsMap(ADStatsRequest adStatsRequest) { * @param adStatsRequest Request containing stats to be retrieved */ private void getNodeStats( - Client client, - MultiResponsesDelegateActionListener listener, - ADStatsRequest adStatsRequest + Client client, + MultiResponsesDelegateActionListener listener, + ADStatsRequest adStatsRequest ) { client.execute(ADStatsNodesAction.INSTANCE, adStatsRequest, ActionListener.wrap(adStatsResponse -> { ADStatsResponse restADStatsResponse = new ADStatsResponse(); diff --git a/src/test/java/com/amazon/opendistroforelasticsearch/ad/transport/StatsAnomalyDetectorActionTests.java b/src/test/java/com/amazon/opendistroforelasticsearch/ad/transport/StatsAnomalyDetectorActionTests.java new file mode 100644 index 00000000..d043a719 --- /dev/null +++ b/src/test/java/com/amazon/opendistroforelasticsearch/ad/transport/StatsAnomalyDetectorActionTests.java @@ -0,0 +1,76 @@ +/* + * Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.amazon.opendistroforelasticsearch.ad.transport; + +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.elasticsearch.action.FailedNodeException; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.common.io.stream.BytesStreamOutput; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentFactory; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.test.ESTestCase; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import com.amazon.opendistroforelasticsearch.ad.stats.ADStatsResponse; + +public class StatsAnomalyDetectorActionTests extends ESTestCase { + + @Override + @Before + public void setUp() throws Exception { + super.setUp(); + } + + @Test + public void testStatsAction() { + Assert.assertNotNull(StatsAnomalyDetectorAction.INSTANCE.name()); + Assert.assertEquals(StatsAnomalyDetectorAction.INSTANCE.name(), StatsAnomalyDetectorAction.NAME); + } + + @Test + public void testStatsResponse() throws IOException { + ADStatsResponse adStatsResponse = new ADStatsResponse(); + Map testClusterStats = new HashMap<>(); + testClusterStats.put("test_response", 1); + adStatsResponse.setClusterStats(testClusterStats); + List responses = Collections.emptyList(); + List failures = Collections.emptyList(); + ADStatsNodesResponse adStatsNodesResponse = new ADStatsNodesResponse(ClusterName.DEFAULT, responses, failures); + adStatsResponse.setADStatsNodesResponse(adStatsNodesResponse); + + StatsAnomalyDetectorResponse response = new StatsAnomalyDetectorResponse(adStatsResponse); + BytesStreamOutput out = new BytesStreamOutput(); + response.writeTo(out); + StreamInput input = out.bytes().streamInput(); + StatsAnomalyDetectorResponse newResponse = new StatsAnomalyDetectorResponse(input); + assertNotNull(newResponse); + + XContentBuilder builder = XContentFactory.jsonBuilder(); + builder = newResponse.toXContent(builder, ToXContent.EMPTY_PARAMS); + XContentParser parser = createParser(builder); + assertEquals(1, parser.map().get("test_response")); + } +}