Skip to content

Commit

Permalink
[7.11][ML] Only resume job from current snapshot if master node suppo…
Browse files Browse the repository at this point in the history
…rts it (#66815) (#66921)

As resuming a job from its current snapshot requires functionality added
to the _revert API in version `7.11`, we need to ensure we do not do this
unless the master node has been updated to a version that supports this
feature.

Closes #66752

Backport of #66815
  • Loading branch information
dimitris-athanasiou authored Jan 4, 2021
1 parent 4e0f7bb commit 9fa9eda
Showing 1 changed file with 8 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
Expand Down Expand Up @@ -62,6 +63,10 @@ public class OpenJobPersistentTasksExecutor extends AbstractJobPersistentTasksEx

private static final Logger logger = LogManager.getLogger(OpenJobPersistentTasksExecutor.class);

// Resuming a job with a running datafeed from its current snapshot was added in 7.11 and
// can only be done if the master node is on or after that version.
private static final Version MIN_MASTER_NODE_VERSION_FOR_REVERTING_TO_CURRENT_SNAPSHOT = Version.V_7_11_0;

public static String[] indicesOfInterest(String resultsIndex) {
if (resultsIndex == null) {
return new String[]{AnomalyDetectorsIndex.jobStateIndexPattern(), MlMetaIndex.indexName(),
Expand Down Expand Up @@ -205,7 +210,9 @@ private void runJob(JobTask jobTask, JobState jobState, OpenJobAction.JobParams

ActionListener<Boolean> hasRunningDatafeedTaskListener = ActionListener.wrap(
hasRunningDatafeed -> {
if (hasRunningDatafeed) {
if (hasRunningDatafeed && clusterState.nodes().getMasterNode().getVersion().onOrAfter(
MIN_MASTER_NODE_VERSION_FOR_REVERTING_TO_CURRENT_SNAPSHOT)) {

// This job has a running datafeed attached to it.
// In order to prevent gaps in the model we revert to the current snapshot deleting intervening results.
revertToCurrentSnapshot(jobTask.getJobId(), ActionListener.wrap(response -> openJob(jobTask), jobTask::markAsFailed));
Expand Down

0 comments on commit 9fa9eda

Please sign in to comment.