Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Moving Preview Anomaly Detectors to Transport layer #321

Merged
merged 6 commits into from
Jan 6, 2021
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,7 @@ List<String> jacocoExclusions = [
'com.amazon.opendistroforelasticsearch.ad.transport.GetAnomalyDetectorRequest',
'com.amazon.opendistroforelasticsearch.ad.transport.IndexAnomalyDetectorResponse',
'com.amazon.opendistroforelasticsearch.ad.transport.IndexAnomalyDetectorTransportAction',
'com.amazon.opendistroforelasticsearch.ad.transport.PreviewAnomalyDetectorTransportAction*',
saratvemulapalli marked this conversation as resolved.
Show resolved Hide resolved

// TODO: hc caused coverage to drop
'com.amazon.opendistroforelasticsearch.ad.NodeStateManager',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@
import com.amazon.opendistroforelasticsearch.ad.rest.RestExecuteAnomalyDetectorAction;
import com.amazon.opendistroforelasticsearch.ad.rest.RestGetAnomalyDetectorAction;
import com.amazon.opendistroforelasticsearch.ad.rest.RestIndexAnomalyDetectorAction;
import com.amazon.opendistroforelasticsearch.ad.rest.RestPreviewAnomalyDetectorAction;
import com.amazon.opendistroforelasticsearch.ad.rest.RestSearchAnomalyDetectorAction;
import com.amazon.opendistroforelasticsearch.ad.rest.RestSearchAnomalyDetectorInfoAction;
import com.amazon.opendistroforelasticsearch.ad.rest.RestSearchAnomalyResultAction;
Expand Down Expand Up @@ -129,6 +130,8 @@
import com.amazon.opendistroforelasticsearch.ad.transport.GetAnomalyDetectorTransportAction;
import com.amazon.opendistroforelasticsearch.ad.transport.IndexAnomalyDetectorAction;
import com.amazon.opendistroforelasticsearch.ad.transport.IndexAnomalyDetectorTransportAction;
import com.amazon.opendistroforelasticsearch.ad.transport.PreviewAnomalyDetectorAction;
import com.amazon.opendistroforelasticsearch.ad.transport.PreviewAnomalyDetectorTransportAction;
import com.amazon.opendistroforelasticsearch.ad.transport.ProfileAction;
import com.amazon.opendistroforelasticsearch.ad.transport.ProfileTransportAction;
import com.amazon.opendistroforelasticsearch.ad.transport.RCFPollingAction;
Expand Down Expand Up @@ -232,14 +235,11 @@ public List<RestHandler> getRestHandlers(
RestSearchAnomalyDetectorAction searchAnomalyDetectorAction = new RestSearchAnomalyDetectorAction();
RestSearchAnomalyResultAction searchAnomalyResultAction = new RestSearchAnomalyResultAction();
RestDeleteAnomalyDetectorAction deleteAnomalyDetectorAction = new RestDeleteAnomalyDetectorAction();
RestExecuteAnomalyDetectorAction executeAnomalyDetectorAction = new RestExecuteAnomalyDetectorAction(
settings,
clusterService,
anomalyDetectorRunner
);
RestExecuteAnomalyDetectorAction executeAnomalyDetectorAction = new RestExecuteAnomalyDetectorAction(settings, clusterService);
RestStatsAnomalyDetectorAction statsAnomalyDetectorAction = new RestStatsAnomalyDetectorAction(adStats, this.nodeFilter);
RestAnomalyDetectorJobAction anomalyDetectorJobAction = new RestAnomalyDetectorJobAction(settings, clusterService);
RestSearchAnomalyDetectorInfoAction searchAnomalyDetectorInfoAction = new RestSearchAnomalyDetectorInfoAction();
RestPreviewAnomalyDetectorAction previewAnomalyDetectorAction = new RestPreviewAnomalyDetectorAction();

return ImmutableList
.of(
Expand All @@ -251,7 +251,8 @@ public List<RestHandler> getRestHandlers(
executeAnomalyDetectorAction,
anomalyDetectorJobAction,
statsAnomalyDetectorAction,
searchAnomalyDetectorInfoAction
searchAnomalyDetectorInfoAction,
previewAnomalyDetectorAction
);
}

Expand Down Expand Up @@ -607,7 +608,8 @@ public List<NamedXContentRegistry.Entry> getNamedXContent() {
new ActionHandler<>(ADResultBulkAction.INSTANCE, ADResultBulkTransportAction.class),
new ActionHandler<>(EntityResultAction.INSTANCE, EntityResultTransportAction.class),
new ActionHandler<>(EntityProfileAction.INSTANCE, EntityProfileTransportAction.class),
new ActionHandler<>(SearchAnomalyDetectorInfoAction.INSTANCE, SearchAnomalyDetectorInfoTransportAction.class)
new ActionHandler<>(SearchAnomalyDetectorInfoAction.INSTANCE, SearchAnomalyDetectorInfoTransportAction.class),
new ActionHandler<>(PreviewAnomalyDetectorAction.INSTANCE, PreviewAnomalyDetectorTransportAction.class)
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,8 @@

package com.amazon.opendistroforelasticsearch.ad.rest;

import static com.amazon.opendistroforelasticsearch.ad.settings.AnomalyDetectorSettings.MAX_ANOMALY_FEATURES;
import static com.amazon.opendistroforelasticsearch.ad.settings.AnomalyDetectorSettings.REQUEST_TIMEOUT;
import static com.amazon.opendistroforelasticsearch.ad.util.RestHandlerUtils.DETECTOR_ID;
import static com.amazon.opendistroforelasticsearch.ad.util.RestHandlerUtils.PREVIEW;
import static com.amazon.opendistroforelasticsearch.ad.util.RestHandlerUtils.RUN;
import static org.elasticsearch.common.xcontent.XContentParserUtils.ensureExpectedToken;

Expand All @@ -29,35 +27,23 @@
import org.apache.commons.lang.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.BytesRestResponse;
import org.elasticsearch.rest.RestChannel;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.rest.action.RestActionListener;
import org.elasticsearch.rest.action.RestToXContentListener;

import com.amazon.opendistroforelasticsearch.ad.AnomalyDetectorPlugin;
import com.amazon.opendistroforelasticsearch.ad.AnomalyDetectorRunner;
import com.amazon.opendistroforelasticsearch.ad.constant.CommonErrorMessages;
import com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetector;
import com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetectorExecutionInput;
import com.amazon.opendistroforelasticsearch.ad.settings.EnabledSetting;
import com.amazon.opendistroforelasticsearch.ad.transport.AnomalyResultAction;
import com.amazon.opendistroforelasticsearch.ad.transport.AnomalyResultRequest;
import com.amazon.opendistroforelasticsearch.ad.util.RestHandlerUtils;
import com.google.common.collect.ImmutableList;

/**
Expand All @@ -66,21 +52,14 @@
public class RestExecuteAnomalyDetectorAction extends BaseRestHandler {

public static final String DETECT_DATA_ACTION = "execute_anomaly_detector";
public static final String ANOMALY_RESULT = "anomaly_result";
public static final String ANOMALY_DETECTOR = "anomaly_detector";
private final AnomalyDetectorRunner anomalyDetectorRunner;
// TODO: apply timeout config
private volatile TimeValue requestTimeout;
private volatile Integer maxAnomalyFeatures;

private final Logger logger = LogManager.getLogger(RestExecuteAnomalyDetectorAction.class);

public RestExecuteAnomalyDetectorAction(Settings settings, ClusterService clusterService, AnomalyDetectorRunner anomalyDetectorRunner) {
this.anomalyDetectorRunner = anomalyDetectorRunner;
public RestExecuteAnomalyDetectorAction(Settings settings, ClusterService clusterService) {
this.requestTimeout = REQUEST_TIMEOUT.get(settings);
maxAnomalyFeatures = MAX_ANOMALY_FEATURES.get(settings);
clusterService.getClusterSettings().addSettingsUpdateConsumer(REQUEST_TIMEOUT, it -> requestTimeout = it);
clusterService.getClusterSettings().addSettingsUpdateConsumer(MAX_ANOMALY_FEATURES, it -> maxAnomalyFeatures = it);
}

@Override
Expand All @@ -102,42 +81,15 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli
return;
}

if (rawPath.endsWith(PREVIEW)) {
if (input.getDetector() != null) {
error = validateDetector(input.getDetector());
if (StringUtils.isNotBlank(error)) {
channel.sendResponse(new BytesRestResponse(RestStatus.BAD_REQUEST, error));
return;
}
anomalyDetectorRunner
.executeDetector(
input.getDetector(),
input.getPeriodStart(),
input.getPeriodEnd(),
getPreviewDetectorActionListener(channel, input.getDetector())
);
} else {
preivewAnomalyDetector(client, channel, input);
}
} else if (rawPath.endsWith(RUN)) {
AnomalyResultRequest getRequest = new AnomalyResultRequest(
input.getDetectorId(),
input.getPeriodStart().toEpochMilli(),
input.getPeriodEnd().toEpochMilli()
);
client.execute(AnomalyResultAction.INSTANCE, getRequest, new RestToXContentListener<>(channel));
}
AnomalyResultRequest getRequest = new AnomalyResultRequest(
input.getDetectorId(),
input.getPeriodStart().toEpochMilli(),
input.getPeriodEnd().toEpochMilli()
);
client.execute(AnomalyResultAction.INSTANCE, getRequest, new RestToXContentListener<>(channel));
};
}

private String validateDetector(AnomalyDetector detector) {
if (detector.getFeatureAttributes().isEmpty()) {
return "Can't preview detector without feature";
} else {
return RestHandlerUtils.validateAnomalyDetector(detector, maxAnomalyFeatures);
}
}

private AnomalyDetectorExecutionInput getAnomalyDetectorExecutionInput(RestRequest request) throws IOException {
String detectorId = null;
if (request.hasParam(DETECTOR_ID)) {
Expand Down Expand Up @@ -166,75 +118,6 @@ private String validateAdExecutionInput(AnomalyDetectorExecutionInput input) {
return null;
}

private void preivewAnomalyDetector(NodeClient client, RestChannel channel, AnomalyDetectorExecutionInput input) {
if (!StringUtils.isBlank(input.getDetectorId())) {
GetRequest getRequest = new GetRequest(AnomalyDetector.ANOMALY_DETECTORS_INDEX).id(input.getDetectorId());
try (ThreadContext.StoredContext context = client.threadPool().getThreadContext().stashContext()) {
client.get(getRequest, onGetAnomalyDetectorResponse(channel, input));
} catch (Exception e) {
logger.error("Fail to get detector for preview", e);
channel.sendResponse(new BytesRestResponse(RestStatus.INTERNAL_SERVER_ERROR, e.getMessage()));
}
} else {
channel.sendResponse(new BytesRestResponse(RestStatus.NOT_FOUND, "Wrong input, no detector id"));
}
}

private RestActionListener<GetResponse> onGetAnomalyDetectorResponse(RestChannel channel, AnomalyDetectorExecutionInput input) {
return new RestActionListener<GetResponse>(channel) {
@Override
protected void processResponse(GetResponse response) throws Exception {
if (!response.isExists()) {
XContentBuilder message = channel
.newErrorBuilder()
.startObject()
.field("message", "Can't find anomaly detector with id:" + response.getId())
.endObject();
channel.sendResponse(new BytesRestResponse(RestStatus.NOT_FOUND, message));
return;
}
XContentParser parser = XContentType.JSON
.xContent()
.createParser(
channel.request().getXContentRegistry(),
LoggingDeprecationHandler.INSTANCE,
response.getSourceAsBytesRef().streamInput()
);

ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser);
AnomalyDetector detector = AnomalyDetector.parse(parser, response.getId(), response.getVersion());

anomalyDetectorRunner
.executeDetector(
detector,
input.getPeriodStart(),
input.getPeriodEnd(),
getPreviewDetectorActionListener(channel, detector)
);
}
};
}

private ActionListener getPreviewDetectorActionListener(RestChannel channel, AnomalyDetector detector) {
return ActionListener.wrap(anomalyResult -> {
XContentBuilder builder = channel
.newBuilder()
.startObject()
.field(ANOMALY_RESULT, anomalyResult)
.field(ANOMALY_DETECTOR, detector)
.endObject();
channel.sendResponse(new BytesRestResponse(RestStatus.OK, builder));
}, exception -> {
logger.error("Unexpected error running anomaly detector " + detector.getDetectorId(), exception);
try {
XContentBuilder builder = channel.newBuilder().startObject().field(ANOMALY_DETECTOR, detector).endObject();
channel.sendResponse(new BytesRestResponse(RestStatus.INTERNAL_SERVER_ERROR, builder));
} catch (IOException e) {
logger.error("Fail to send back exception message" + detector.getDetectorId(), exception);
}
});
}

@Override
public List<Route> routes() {
return ImmutableList
Expand All @@ -243,11 +126,6 @@ public List<Route> routes() {
new Route(
RestRequest.Method.POST,
String.format(Locale.ROOT, "%s/{%s}/%s", AnomalyDetectorPlugin.AD_BASE_DETECTORS_URI, DETECTOR_ID, RUN)
),
// preview detector
new Route(
RestRequest.Method.POST,
String.format(Locale.ROOT, "%s/{%s}/%s", AnomalyDetectorPlugin.AD_BASE_DETECTORS_URI, DETECTOR_ID, PREVIEW)
)
);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
/*
* Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package com.amazon.opendistroforelasticsearch.ad.rest;

import static com.amazon.opendistroforelasticsearch.ad.util.RestHandlerUtils.PREVIEW;
import static org.elasticsearch.common.xcontent.XContentParserUtils.ensureExpectedToken;

import java.io.IOException;
import java.util.List;
import java.util.Locale;

import org.apache.commons.lang.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.BytesRestResponse;
import org.elasticsearch.rest.RestHandler;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.rest.action.RestToXContentListener;

import com.amazon.opendistroforelasticsearch.ad.AnomalyDetectorPlugin;
import com.amazon.opendistroforelasticsearch.ad.constant.CommonErrorMessages;
import com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetectorExecutionInput;
import com.amazon.opendistroforelasticsearch.ad.settings.EnabledSetting;
import com.amazon.opendistroforelasticsearch.ad.transport.PreviewAnomalyDetectorAction;
import com.amazon.opendistroforelasticsearch.ad.transport.PreviewAnomalyDetectorRequest;
import com.amazon.opendistroforelasticsearch.ad.util.RestHandlerUtils;
import com.google.common.collect.ImmutableList;

public class RestPreviewAnomalyDetectorAction extends BaseRestHandler {

public static final String PREVIEW_ANOMALY_DETECTOR_ACTION = "preview_anomaly_detector";

private static final Logger logger = LogManager.getLogger(RestPreviewAnomalyDetectorAction.class);

public RestPreviewAnomalyDetectorAction() {}

@Override
public String getName() {
return PREVIEW_ANOMALY_DETECTOR_ACTION;
}

@Override
protected RestChannelConsumer prepareRequest(RestRequest request, org.elasticsearch.client.node.NodeClient client) throws IOException {
if (!EnabledSetting.isADPluginEnabled()) {
throw new IllegalStateException(CommonErrorMessages.DISABLED_ERR_MSG);
}

AnomalyDetectorExecutionInput input = getAnomalyDetectorExecutionInput(request);

return channel -> {
String rawPath = request.rawPath();
String error = validateAdExecutionInput(input);
if (StringUtils.isNotBlank(error)) {
channel.sendResponse(new BytesRestResponse(RestStatus.BAD_REQUEST, error));
return;
}
PreviewAnomalyDetectorRequest previewRequest = new PreviewAnomalyDetectorRequest(
input.getDetector(),
input.getDetectorId(),
input.getPeriodStart(),
input.getPeriodEnd()
);
client.execute(PreviewAnomalyDetectorAction.INSTANCE, previewRequest, new RestToXContentListener<>(channel));
};
}

private AnomalyDetectorExecutionInput getAnomalyDetectorExecutionInput(RestRequest request) throws IOException {
String detectorId = null;
if (request.hasParam(RestHandlerUtils.DETECTOR_ID)) {
detectorId = request.param(RestHandlerUtils.DETECTOR_ID);
}

XContentParser parser = request.contentParser();
ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser);
AnomalyDetectorExecutionInput input = AnomalyDetectorExecutionInput.parse(parser, detectorId);
if (detectorId != null) {
input.setDetectorId(detectorId);
}
return input;
}

private String validateAdExecutionInput(AnomalyDetectorExecutionInput input) {
if (StringUtils.isBlank(input.getDetectorId())) {
return "Must set anomaly detector id";
}
if (input.getPeriodStart() == null || input.getPeriodEnd() == null) {
return "Must set both period start and end date with epoch of milliseconds";
}
if (!input.getPeriodStart().isBefore(input.getPeriodEnd())) {
return "Period start date should be before end date";
}
return null;
}

@Override
public List<RestHandler.Route> routes() {
return ImmutableList
.of(
// preview detector
new Route(
RestRequest.Method.POST,
String
.format(
Locale.ROOT,
"%s/{%s}/%s",
AnomalyDetectorPlugin.AD_BASE_DETECTORS_URI,
RestHandlerUtils.DETECTOR_ID,
PREVIEW
)
)
);
}
}
Loading