Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Commit

Permalink
Adding RestActions support Create/Update Detector API (#243)
Browse files Browse the repository at this point in the history
* Adding RestActions support Create Detector API
  • Loading branch information
saratvemulapalli authored Oct 13, 2020
1 parent de644de commit ebf145a
Show file tree
Hide file tree
Showing 12 changed files with 638 additions and 96 deletions.
3 changes: 2 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,8 @@ List<String> jacocoExclusions = [
'com.amazon.opendistroforelasticsearch.ad.transport.StatsAnomalyDetectorTransportAction',
'com.amazon.opendistroforelasticsearch.ad.transport.DeleteAnomalyDetectorTransportAction*',
'com.amazon.opendistroforelasticsearch.ad.transport.GetAnomalyDetectorTransportAction*',
'com.amazon.opendistroforelasticsearch.ad.transport.GetAnomalyDetectorResponse'
'com.amazon.opendistroforelasticsearch.ad.transport.GetAnomalyDetectorResponse',
'com.amazon.opendistroforelasticsearch.ad.transport.IndexAnomalyDetectorRequest'
]

jacocoTestCoverageVerification {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,8 @@
import com.amazon.opendistroforelasticsearch.ad.transport.DeleteModelTransportAction;
import com.amazon.opendistroforelasticsearch.ad.transport.GetAnomalyDetectorAction;
import com.amazon.opendistroforelasticsearch.ad.transport.GetAnomalyDetectorTransportAction;
import com.amazon.opendistroforelasticsearch.ad.transport.IndexAnomalyDetectorAction;
import com.amazon.opendistroforelasticsearch.ad.transport.IndexAnomalyDetectorTransportAction;
import com.amazon.opendistroforelasticsearch.ad.transport.ProfileAction;
import com.amazon.opendistroforelasticsearch.ad.transport.ProfileTransportAction;
import com.amazon.opendistroforelasticsearch.ad.transport.RCFPollingAction;
Expand Down Expand Up @@ -471,7 +473,8 @@ public List<NamedXContentRegistry.Entry> getNamedXContent() {
new ActionHandler<>(SearchAnomalyResultAction.INSTANCE, SearchAnomalyResultTransportAction.class),
new ActionHandler<>(StatsAnomalyDetectorAction.INSTANCE, StatsAnomalyDetectorTransportAction.class),
new ActionHandler<>(DeleteAnomalyDetectorAction.INSTANCE, DeleteAnomalyDetectorTransportAction.class),
new ActionHandler<>(GetAnomalyDetectorAction.INSTANCE, GetAnomalyDetectorTransportAction.class)
new ActionHandler<>(GetAnomalyDetectorAction.INSTANCE, GetAnomalyDetectorTransportAction.class),
new ActionHandler<>(IndexAnomalyDetectorAction.INSTANCE, IndexAnomalyDetectorTransportAction.class)
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,17 +38,25 @@
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.BytesRestResponse;
import org.elasticsearch.rest.RestChannel;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.RestResponse;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.rest.action.RestResponseListener;

import com.amazon.opendistroforelasticsearch.ad.AnomalyDetectorPlugin;
import com.amazon.opendistroforelasticsearch.ad.constant.CommonErrorMessages;
import com.amazon.opendistroforelasticsearch.ad.indices.AnomalyDetectionIndices;
import com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetector;
import com.amazon.opendistroforelasticsearch.ad.rest.handler.IndexAnomalyDetectorActionHandler;
import com.amazon.opendistroforelasticsearch.ad.settings.EnabledSetting;
import com.amazon.opendistroforelasticsearch.ad.transport.IndexAnomalyDetectorAction;
import com.amazon.opendistroforelasticsearch.ad.transport.IndexAnomalyDetectorRequest;
import com.amazon.opendistroforelasticsearch.ad.transport.IndexAnomalyDetectorResponse;
import com.google.common.collect.ImmutableList;

/**
Expand Down Expand Up @@ -115,22 +123,19 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli
WriteRequest.RefreshPolicy refreshPolicy = request.hasParam(REFRESH)
? WriteRequest.RefreshPolicy.parse(request.param(REFRESH))
: WriteRequest.RefreshPolicy.IMMEDIATE;
RestRequest.Method method = request.getHttpRequest().method();

return channel -> new IndexAnomalyDetectorActionHandler(
settings,
clusterService,
client,
channel,
anomalyDetectionIndices,
IndexAnomalyDetectorRequest indexAnomalyDetectorRequest = new IndexAnomalyDetectorRequest(
detectorId,
seqNo,
primaryTerm,
refreshPolicy,
detector,
requestTimeout,
maxAnomalyDetectors,
maxAnomalyFeatures
).start();
method
);

return channel -> client
.execute(IndexAnomalyDetectorAction.INSTANCE, indexAnomalyDetectorRequest, indexAnomalyDetectorResponse(channel, method));
}

@Override
Expand All @@ -146,4 +151,28 @@ public List<Route> routes() {
)
);
}

private RestResponseListener<IndexAnomalyDetectorResponse> indexAnomalyDetectorResponse(
RestChannel channel,
RestRequest.Method method
) {
return new RestResponseListener<IndexAnomalyDetectorResponse>(channel) {
@Override
public RestResponse buildResponse(IndexAnomalyDetectorResponse response) throws Exception {
RestStatus restStatus = RestStatus.CREATED;
if (method == RestRequest.Method.PUT) {
restStatus = RestStatus.OK;
}
BytesRestResponse bytesRestResponse = new BytesRestResponse(
restStatus,
response.toXContent(channel.newBuilder(), ToXContent.EMPTY_PARAMS)
);
if (restStatus == RestStatus.CREATED) {
String location = String.format(Locale.ROOT, "%s/%s", AnomalyDetectorPlugin.AD_BASE_URI, response.getId());
bytesRestResponse.addHeader("Location", location);
}
return bytesRestResponse;
}
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,14 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.rest.BytesRestResponse;
import org.elasticsearch.rest.RestChannel;
import org.elasticsearch.rest.RestStatus;

import com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetectorJob;
Expand All @@ -49,47 +49,58 @@ public class AnomalyDetectorActionHandler {
* @param clusterService ES cluster service
* @param client ES node client
* @param detectorId detector identifier
* @param channel ES rest channel
* @param listener Listener to send response
* @param function AD function
* @param xContentRegistry Registry which is used for XContentParser
*/
public void getDetectorJob(
ClusterService clusterService,
NodeClient client,
Client client,
String detectorId,
RestChannel channel,
AnomalyDetectorFunction function
ActionListener listener,
AnomalyDetectorFunction function,
NamedXContentRegistry xContentRegistry
) {
if (clusterService.state().metadata().indices().containsKey(ANOMALY_DETECTOR_JOB_INDEX)) {
GetRequest request = new GetRequest(ANOMALY_DETECTOR_JOB_INDEX).id(detectorId);
client.get(request, ActionListener.wrap(response -> onGetAdJobResponseForWrite(response, channel, function), exception -> {
logger.error("Fail to get anomaly detector job: " + detectorId, exception);
try {
channel.sendResponse(new BytesRestResponse(channel, exception));
} catch (IOException e) {
logger.error("Fail to send exception" + detectorId, e);
}
}));
client
.get(
request,
ActionListener
.wrap(response -> onGetAdJobResponseForWrite(response, listener, function, xContentRegistry), exception -> {
logger.error("Fail to get anomaly detector job: " + detectorId, exception);
listener.onFailure(exception);
})
);
} else {
function.execute();
}
}

private void onGetAdJobResponseForWrite(GetResponse response, RestChannel channel, AnomalyDetectorFunction function) {
private void onGetAdJobResponseForWrite(
GetResponse response,
ActionListener listener,
AnomalyDetectorFunction function,
NamedXContentRegistry xContentRegistry
) {
if (response.isExists()) {
String adJobId = response.getId();
if (adJobId != null) {
// check if AD job is running on the detector, if yes, we can't delete the detector
try (XContentParser parser = RestHandlerUtils.createXContentParser(channel, response.getSourceAsBytesRef())) {
try (
XContentParser parser = RestHandlerUtils
.createXContentParserFromRegistry(xContentRegistry, response.getSourceAsBytesRef())
) {
ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser::getTokenLocation);
AnomalyDetectorJob adJob = AnomalyDetectorJob.parse(parser);
if (adJob.isEnabled()) {
channel.sendResponse(new BytesRestResponse(RestStatus.BAD_REQUEST, "Detector job is running: " + adJobId));
listener.onFailure(new ElasticsearchStatusException("Detector job is running: " + adJobId, RestStatus.BAD_REQUEST));
return;
}
} catch (IOException e) {
String message = "Failed to parse anomaly detector job " + adJobId;
logger.error(message, e);
channel.sendResponse(new BytesRestResponse(RestStatus.BAD_REQUEST, message));
listener.onFailure(new ElasticsearchStatusException(message, RestStatus.BAD_REQUEST));
}
}
}
Expand Down
Loading

0 comments on commit ebf145a

Please sign in to comment.