Skip to content

Commit

Permalink
[ML] Fix simultaneous stop and force stop datafeed
Browse files Browse the repository at this point in the history
If a datafeed is stopped normally and force stopped at the same
time then it is possible that the force stop removes the
persistent task while the normal stop is performing actions.
Currently this causes the normal stop to error, but since
stopping a stopped datafeed is not an error this doesn't make
sense. Instead the force stop should just take precedence.

This is a followup to elastic#49191 and should really have been
included in the changes in that PR.
  • Loading branch information
droberts195 committed Nov 20, 2019
1 parent 38de033 commit 23be3ef
Showing 1 changed file with 6 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.AtomicArray;
Expand Down Expand Up @@ -259,17 +258,18 @@ protected void taskOperation(StopDatafeedAction.Request request, TransportStartD
threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME).execute(new AbstractRunnable() {
@Override
public void onFailure(Exception e) {
if ((e instanceof ResourceNotFoundException &&
Strings.isAllOrWildcard(new String[]{request.getDatafeedId()}))) {
datafeedTask.stop("stop_datafeed (api)", request.getStopTimeout());
// We validated that the datafeed names supplied in the request existed when we started processing the action.
// If the related task for one of them doesn't exist at this point then it must have been removed by a
// simultaneous force stop request. This is not an error.
if (ExceptionsHelper.unwrapCause(e) instanceof ResourceNotFoundException) {
listener.onResponse(new StopDatafeedAction.Response(true));
} else {
listener.onFailure(e);
}
}

@Override
protected void doRun() throws Exception {
protected void doRun() {
datafeedTask.stop("stop_datafeed (api)", request.getStopTimeout());
listener.onResponse(new StopDatafeedAction.Response(true));
}
Expand Down Expand Up @@ -343,7 +343,7 @@ protected StopDatafeedAction.Response newResponse(StopDatafeedAction.Request req
throw org.elasticsearch.ExceptionsHelper
.convertToElastic(failedNodeExceptions.get(0));
} else {
// This can happen we the actual task in the node no longer exists,
// This can happen when the actual task in the node no longer exists,
// which means the datafeed(s) have already been stopped.
return new StopDatafeedAction.Response(true);
}
Expand Down

0 comments on commit 23be3ef

Please sign in to comment.