Skip to content

Commit

Permalink
[ML] Job in index: Restore ability to update cluster state jobs (#35539)
Browse files Browse the repository at this point in the history
Job updates can apply to cluster state or index jobs this includes 
reverting a model snapshot and finalizing the job
  • Loading branch information
davidkyle authored Nov 14, 2018
1 parent 99798d5 commit 0046d55
Show file tree
Hide file tree
Showing 6 changed files with 418 additions and 65 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ public static UpdateJobAction.Request parseRequest(String jobId, XContentParser

/** Indicates an update that was not triggered by a user */
private boolean isInternal;
private boolean waitForAck = true;

public Request(String jobId, JobUpdate update) {
this(jobId, update, false);
Expand Down Expand Up @@ -87,6 +88,14 @@ public boolean isInternal() {
return isInternal;
}

public boolean isWaitForAck() {
return waitForAck;
}

public void setWaitForAck(boolean waitForAck) {
this.waitForAck = waitForAck;
}

@Override
public ActionRequestValidationException validate() {
return null;
Expand All @@ -102,9 +111,10 @@ public void readFrom(StreamInput in) throws IOException {
} else {
isInternal = false;
}
// TODO jindex change CURRENT to specific version when feature branch is merged
if (in.getVersion().onOrAfter(Version.V_6_3_0) && in.getVersion().before(Version.CURRENT)) {
in.readBoolean(); // was waitForAck
if (in.getVersion().onOrAfter(Version.V_6_3_0)) {
waitForAck = in.readBoolean();
} else {
waitForAck = true;
}
}

Expand All @@ -116,9 +126,8 @@ public void writeTo(StreamOutput out) throws IOException {
if (out.getVersion().onOrAfter(Version.V_6_2_2)) {
out.writeBoolean(isInternal);
}
// TODO jindex change CURRENT to specific version when feature branch is merged
if (out.getVersion().onOrAfter(Version.V_6_3_0) && out.getVersion().before(Version.CURRENT)) {
out.writeBoolean(false); // was waitForAck
if (out.getVersion().onOrAfter(Version.V_6_3_0)) {
out.writeBoolean(waitForAck);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,27 @@
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.XPackPlugin;
import org.elasticsearch.xpack.core.ml.MlMetadata;
import org.elasticsearch.xpack.core.ml.action.FinalizeJobExecutionAction;
import org.elasticsearch.xpack.core.ml.job.config.Job;

import java.util.Arrays;
import java.util.Date;
import java.util.List;
import java.util.stream.Collectors;

// This action is only called from modes before version 6.6.0
public class TransportFinalizeJobExecutionAction extends TransportMasterNodeAction<FinalizeJobExecutionAction.Request,
AcknowledgedResponse> {

Expand All @@ -45,9 +56,55 @@ protected AcknowledgedResponse newResponse() {
@Override
protected void masterOperation(FinalizeJobExecutionAction.Request request, ClusterState state,
ActionListener<AcknowledgedResponse> listener) {
// This action is no longer required but needs to be preserved
// in case it is called by an old node in a mixed cluster
listener.onResponse(new AcknowledgedResponse(true));

MlMetadata mlMetadata = MlMetadata.getMlMetadata(state);
List<String> jobsInClusterState = Arrays.stream(request.getJobIds())
.filter(id -> mlMetadata.getJobs().containsKey(id))
.collect(Collectors.toList());

// This action should not be called for jobs that have
// their configuration in index documents

if (jobsInClusterState.isEmpty()) {
// This action is a no-op for jobs not defined in the cluster state.
listener.onResponse(new AcknowledgedResponse(true));
return;
}

String jobIdString = String.join(",", jobsInClusterState);
String source = "finalize_job_execution [" + jobIdString + "]";
logger.debug("finalizing jobs [{}]", jobIdString);
clusterService.submitStateUpdateTask(source, new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
XPackPlugin.checkReadyForXPackCustomMetadata(currentState);
MlMetadata mlMetadata = MlMetadata.getMlMetadata(currentState);
MlMetadata.Builder mlMetadataBuilder = new MlMetadata.Builder(mlMetadata);
Date finishedTime = new Date();

for (String jobId : jobsInClusterState) {
Job.Builder jobBuilder = new Job.Builder(mlMetadata.getJobs().get(jobId));
jobBuilder.setFinishedTime(finishedTime);
mlMetadataBuilder.putJob(jobBuilder.build(), true);
}
ClusterState.Builder builder = ClusterState.builder(currentState);
return builder.metaData(new MetaData.Builder(currentState.metaData())
.putCustom(MlMetadata.TYPE, mlMetadataBuilder.build()))
.build();
}

@Override
public void onFailure(String source, Exception e) {
listener.onFailure(e);
}

@Override
public void clusterStateProcessed(String source, ClusterState oldState,
ClusterState newState) {
logger.debug("finalized job [{}]", jobIdString);
listener.onResponse(new AcknowledgedResponse(true));
}
});
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.job;

import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.xpack.core.XPackPlugin;
import org.elasticsearch.xpack.core.ml.MlMetadata;
import org.elasticsearch.xpack.core.ml.job.config.Job;

/**
* Helper functions for managing cluster state job configurations
*/
public final class ClusterStateJobUpdate {

private ClusterStateJobUpdate() {
}

public static boolean jobIsInClusterState(ClusterState clusterState, String jobId) {
MlMetadata mlMetadata = MlMetadata.getMlMetadata(clusterState);
return mlMetadata.getJobs().containsKey(jobId);
}

public static boolean jobIsInMlMetadata(MlMetadata mlMetadata, String jobId) {
return mlMetadata.getJobs().containsKey(jobId);
}

public static ClusterState putJobInClusterState(Job job, boolean overwrite, ClusterState currentState) {
MlMetadata.Builder builder = createMlMetadataBuilder(currentState);
builder.putJob(job, overwrite);
return buildNewClusterState(currentState, builder);
}

private static MlMetadata.Builder createMlMetadataBuilder(ClusterState currentState) {
return new MlMetadata.Builder(MlMetadata.getMlMetadata(currentState));
}

private static ClusterState buildNewClusterState(ClusterState currentState, MlMetadata.Builder builder) {
XPackPlugin.checkReadyForXPackCustomMetadata(currentState);
ClusterState.Builder newState = ClusterState.builder(currentState);
newState.metaData(MetaData.builder(currentState.getMetaData()).putCustom(MlMetadata.TYPE, builder.build()).build());
return newState.build();
}
}
Loading

0 comments on commit 0046d55

Please sign in to comment.