diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportCloseJobAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportCloseJobAction.java index 1076533660273..1a8aea05c458b 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportCloseJobAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportCloseJobAction.java @@ -6,6 +6,7 @@ package org.elasticsearch.xpack.ml.action; import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListenerResponseHandler; import org.elasticsearch.action.FailedNodeException; @@ -16,6 +17,7 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.Strings; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.AtomicArray; @@ -272,7 +274,12 @@ protected void taskOperation(CloseJobAction.Request request, TransportOpenJobAct threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME).execute(new AbstractRunnable() { @Override public void onFailure(Exception e) { - listener.onFailure(e); + if (e instanceof ResourceNotFoundException && Strings.isAllOrWildcard(new String[]{request.getJobId()})) { + jobTask.closeJob("close job (api)"); + listener.onResponse(new CloseJobAction.Response(true)); + } else { + listener.onFailure(e); + } } @Override @@ -332,7 +339,10 @@ public void onResponse(PersistentTasksCustomMetaData.PersistentTask task) { @Override public void onFailure(Exception e) { final int slot = counter.incrementAndGet(); - failures.set(slot - 1, e); + if ((e instanceof ResourceNotFoundException && + Strings.isAllOrWildcard(new String[]{request.getJobId()})) == false) { + failures.set(slot - 1, e); + } if (slot == numberOfJobs) { sendResponseOrFailure(request.getJobId(), listener, failures); } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStopDatafeedAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStopDatafeedAction.java index 63c47996881c2..636138a855bce 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStopDatafeedAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStopDatafeedAction.java @@ -16,6 +16,7 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.Strings; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.AtomicArray; @@ -187,7 +188,10 @@ public void onResponse(PersistentTasksCustomMetaData.PersistentTask persisten @Override public void onFailure(Exception e) { final int slot = counter.incrementAndGet(); - failures.set(slot - 1, e); + if ((e instanceof ResourceNotFoundException && + Strings.isAllOrWildcard(new String[]{request.getDatafeedId()})) == false) { + failures.set(slot - 1, e); + } if (slot == startedDatafeeds.size()) { sendResponseOrFailure(request.getDatafeedId(), listener, failures); } @@ -215,7 +219,13 @@ protected void taskOperation(StopDatafeedAction.Request request, TransportStartD threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME).execute(new AbstractRunnable() { @Override public void onFailure(Exception e) { - listener.onFailure(e); + if ((e instanceof ResourceNotFoundException && + Strings.isAllOrWildcard(new String[]{request.getDatafeedId()}))) { + datafeedTask.stop("stop_datafeed (api)", request.getStopTimeout()); + listener.onResponse(new StopDatafeedAction.Response(true)); + } else { + listener.onFailure(e); + } } @Override