Skip to content

Commit

Permalink
[ML] JIndex: Prevent updates to migrating configs and upgrade tests (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
davidkyle authored Dec 13, 2018
1 parent fdf48aa commit 2d3de5d
Show file tree
Hide file tree
Showing 17 changed files with 854 additions and 93 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,11 @@ public static ElasticsearchStatusException badRequestException(String msg, Objec
return new ElasticsearchStatusException(msg, RestStatus.BAD_REQUEST, args);
}

public static ElasticsearchStatusException configHasNotBeenMigrated(String verb, String id) {
return new ElasticsearchStatusException("cannot {} as the configuration [{}] is temporarily pending migration",
RestStatus.SERVICE_UNAVAILABLE, verb, id);
}

/**
* Creates an error message that explains there are shard failures, displays info
* for the first failure (shard/reason) and kindly asks to see more info in the logs
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterChangedEvent;
Expand Down Expand Up @@ -82,29 +81,23 @@ public void clusterChanged(ClusterChangedEvent event) {
}
PersistentTasksCustomMetaData previous = event.previousState().getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
PersistentTasksCustomMetaData current = event.state().getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
if (Objects.equals(previous, current)) {
return;
}

Version minNodeVersion = event.state().nodes().getMinNodeVersion();
if (minNodeVersion.onOrAfter(Version.V_6_6_0)) {
// ok to migrate
mlConfigMigrator.migrateConfigsWithoutTasks(event.state(), ActionListener.wrap(
response -> threadPool.executor(executorName()).execute(() -> auditChangesToMlTasks(current, previous, event.state())),
e -> {
logger.error("error migrating ml configurations", e);
threadPool.executor(executorName()).execute(() -> auditChangesToMlTasks(current, previous, event.state()));
}
));
} else {
threadPool.executor(executorName()).execute(() -> auditChangesToMlTasks(current, previous, event.state()));
}

mlConfigMigrator.migrateConfigsWithoutTasks(event.state(), ActionListener.wrap(
response -> threadPool.executor(executorName()).execute(() -> auditChangesToMlTasks(current, previous, event.state())),
e -> {
logger.error("error migrating ml configurations", e);
threadPool.executor(executorName()).execute(() -> auditChangesToMlTasks(current, previous, event.state()));
}
));
}

private void auditChangesToMlTasks(PersistentTasksCustomMetaData current, PersistentTasksCustomMetaData previous,
ClusterState state) {

if (Objects.equals(previous, current)) {
return;
}

for (PersistentTask<?> currentTask : current.tasks()) {
Assignment currentAssignment = currentTask.getAssignment();
PersistentTask<?> previousTask = previous != null ? previous.getTask(currentTask.getId()) : null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ public class MlConfigMigrator {
private static final Logger logger = LogManager.getLogger(MlConfigMigrator.class);

public static final String MIGRATED_FROM_VERSION = "migrated from version";
public static final Version MIN_NODE_VERSION = Version.V_6_6_0;

private final Client client;
private final ClusterService clusterService;
Expand Down Expand Up @@ -106,11 +107,19 @@ public MlConfigMigrator(Client client, ClusterService clusterService) {
*/
public void migrateConfigsWithoutTasks(ClusterState clusterState, ActionListener<Boolean> listener) {

Version minNodeVersion = clusterState.nodes().getMinNodeVersion();
if (minNodeVersion.before(MIN_NODE_VERSION)) {
listener.onResponse(Boolean.FALSE);
return;
}

if (migrationInProgress.compareAndSet(false, true) == false) {
listener.onResponse(Boolean.FALSE);
return;
}

logger.debug("migrating ml configurations");

Collection<DatafeedConfig> datafeedsToMigrate = stoppedDatafeedConfigs(clusterState);
List<Job> jobsToMigrate = nonDeletingJobs(closedJobConfigs(clusterState)).stream()
.map(MlConfigMigrator::updateJobForMigration)
Expand Down Expand Up @@ -381,4 +390,60 @@ static List<String> filterFailedDatafeedConfigWrites(Set<String> failedDocumentI
.filter(id -> failedDocumentIds.contains(DatafeedConfig.documentId(id)) == false)
.collect(Collectors.toList());
}

/**
* Is the job a eligible for migration? Returns:
* False if the min node version of the cluster is before {@link #MIN_NODE_VERSION}
* False if the job is not in the cluster state
* False if the {@link Job#isDeleting()}
* False if the job has a persistent task
* True otherwise i.e. the job is present, not deleting
* and does not have a persistent task.
*
* @param jobId The job Id
* @param clusterState clusterstate
* @return A boolean depending on the conditions listed above
*/
public static boolean jobIsEligibleForMigration(String jobId, ClusterState clusterState) {
Version minNodeVersion = clusterState.nodes().getMinNodeVersion();
if (minNodeVersion.before(MIN_NODE_VERSION)) {
return false;
}

MlMetadata mlMetadata = MlMetadata.getMlMetadata(clusterState);
Job job = mlMetadata.getJobs().get(jobId);

if (job == null || job.isDeleting()) {
return false;
}

PersistentTasksCustomMetaData persistentTasks = clusterState.metaData().custom(PersistentTasksCustomMetaData.TYPE);
return MlTasks.openJobIds(persistentTasks).contains(jobId) == false;
}

/**
* Is the datafeed a eligible for migration? Returns:
* False if the min node version of the cluster is before {@link #MIN_NODE_VERSION}
* False if the datafeed is not in the cluster state
* False if the datafeed has a persistent task
* True otherwise i.e. the datafeed is present and does not have a persistent task.
*
* @param datafeedId The datafeed Id
* @param clusterState clusterstate
* @return A boolean depending on the conditions listed above
*/
public static boolean datafeedIsEligibleForMigration(String datafeedId, ClusterState clusterState) {
Version minNodeVersion = clusterState.nodes().getMinNodeVersion();
if (minNodeVersion.before(MIN_NODE_VERSION)) {
return false;
}

MlMetadata mlMetadata = MlMetadata.getMlMetadata(clusterState);
if (mlMetadata.getDatafeeds().containsKey(datafeedId) == false) {
return false;
}

PersistentTasksCustomMetaData persistentTasks = clusterState.metaData().custom(PersistentTasksCustomMetaData.TYPE);
return MlTasks.startedDatafeedIds(persistentTasks).contains(datafeedId) == false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedState;
import org.elasticsearch.xpack.core.ml.job.messages.Messages;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.ml.MlConfigMigrator;
import org.elasticsearch.xpack.ml.datafeed.persistence.DatafeedConfigProvider;

import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN;
Expand Down Expand Up @@ -72,6 +73,12 @@ protected AcknowledgedResponse newResponse() {
@Override
protected void masterOperation(DeleteDatafeedAction.Request request, ClusterState state,
ActionListener<AcknowledgedResponse> listener) {

if (MlConfigMigrator.datafeedIsEligibleForMigration(request.getDatafeedId(), state)) {
listener.onFailure(ExceptionsHelper.configHasNotBeenMigrated("delete datafeed", request.getDatafeedId()));
return;
}

if (request.isForce()) {
forceDeleteDatafeed(request, state, listener);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.Quantiles;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.ml.MlConfigMigrator;
import org.elasticsearch.xpack.ml.datafeed.persistence.DatafeedConfigProvider;
import org.elasticsearch.xpack.ml.job.JobManager;
import org.elasticsearch.xpack.ml.job.persistence.JobDataDeleter;
Expand Down Expand Up @@ -147,6 +148,12 @@ protected void masterOperation(DeleteJobAction.Request request, ClusterState sta
@Override
protected void masterOperation(Task task, DeleteJobAction.Request request, ClusterState state,
ActionListener<AcknowledgedResponse> listener) {

if (MlConfigMigrator.jobIsEligibleForMigration(request.getJobId(), state)) {
listener.onFailure(ExceptionsHelper.configHasNotBeenMigrated("delete job", request.getJobId()));
return;
}

logger.debug("Deleting job '{}'", request.getJobId());

if (request.isForce() == false) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.MlConfigMigrator;
import org.elasticsearch.xpack.ml.job.ClusterStateJobUpdate;
import org.elasticsearch.xpack.ml.job.JobManager;
import org.elasticsearch.xpack.ml.job.persistence.JobConfigProvider;
Expand Down Expand Up @@ -528,6 +529,11 @@ protected ClusterBlockException checkBlock(OpenJobAction.Request request, Cluste

@Override
protected void masterOperation(OpenJobAction.Request request, ClusterState state, ActionListener<AcknowledgedResponse> listener) {
if (MlConfigMigrator.jobIsEligibleForMigration(request.getJobParams().getJobId(), state)) {
listener.onFailure(ExceptionsHelper.configHasNotBeenMigrated("open job", request.getJobParams().getJobId()));
return;
}

OpenJobAction.JobParams jobParams = request.getJobParams();
if (licenseState.isMachineLearningAllowed()) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.elasticsearch.xpack.core.ml.action.RevertModelSnapshotAction;
import org.elasticsearch.xpack.core.ml.job.config.JobState;
import org.elasticsearch.xpack.core.ml.job.messages.Messages;
import org.elasticsearch.xpack.ml.MlConfigMigrator;
import org.elasticsearch.xpack.ml.job.persistence.JobDataDeleter;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
Expand Down Expand Up @@ -68,6 +69,11 @@ protected RevertModelSnapshotAction.Response newResponse() {
@Override
protected void masterOperation(RevertModelSnapshotAction.Request request, ClusterState state,
ActionListener<RevertModelSnapshotAction.Response> listener) {
if (MlConfigMigrator.jobIsEligibleForMigration(request.getJobId(), state)) {
listener.onFailure(ExceptionsHelper.configHasNotBeenMigrated("revert model snapshot", request.getJobId()));
return;
}

logger.debug("Received request to revert to snapshot id '{}' for job '{}', deleting intervening results: {}",
request.getSnapshotId(), request.getJobId(), request.getDeleteInterveningResults());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.elasticsearch.xpack.core.ml.job.config.JobState;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.MlConfigMigrator;
import org.elasticsearch.xpack.ml.datafeed.DatafeedConfigReader;
import org.elasticsearch.xpack.ml.datafeed.DatafeedManager;
import org.elasticsearch.xpack.ml.datafeed.DatafeedNodeSelector;
Expand Down Expand Up @@ -139,6 +140,11 @@ protected void masterOperation(StartDatafeedAction.Request request, ClusterState
return;
}

if (MlConfigMigrator.datafeedIsEligibleForMigration(request.getParams().getDatafeedId(), state)) {
listener.onFailure(ExceptionsHelper.configHasNotBeenMigrated("start datafeed", request.getParams().getDatafeedId()));
return;
}

AtomicReference<DatafeedConfig> datafeedConfigHolder = new AtomicReference<>();
PersistentTasksCustomMetaData tasks = state.getMetaData().custom(PersistentTasksCustomMetaData.TYPE);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedUpdate;
import org.elasticsearch.xpack.core.ml.job.messages.Messages;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.ml.MlConfigMigrator;
import org.elasticsearch.xpack.ml.datafeed.persistence.DatafeedConfigProvider;
import org.elasticsearch.xpack.ml.job.persistence.JobConfigProvider;

Expand Down Expand Up @@ -69,6 +70,11 @@ protected PutDatafeedAction.Response newResponse() {
protected void masterOperation(UpdateDatafeedAction.Request request, ClusterState state,
ActionListener<PutDatafeedAction.Response> listener) throws Exception {

if (MlConfigMigrator.datafeedIsEligibleForMigration(request.getUpdate().getId(), state)) {
listener.onFailure(ExceptionsHelper.configHasNotBeenMigrated("update datafeed", request.getUpdate().getId()));
return;
}

MlMetadata mlMetadata = MlMetadata.getMlMetadata(state);
boolean datafeedConfigIsInClusterState = mlMetadata.getDatafeed(request.getUpdate().getId()) != null;
if (datafeedConfigIsInClusterState) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.MlConfigMigrator;
import org.elasticsearch.xpack.ml.job.categorization.CategorizationAnalyzer;
import org.elasticsearch.xpack.ml.job.persistence.ExpandedIdsMatcher;
import org.elasticsearch.xpack.ml.job.persistence.JobConfigProvider;
Expand Down Expand Up @@ -436,7 +437,13 @@ public void onFailure(Exception e) {
}

public void updateJob(UpdateJobAction.Request request, ActionListener<PutJobAction.Response> actionListener) {
MlMetadata mlMetadata = MlMetadata.getMlMetadata(clusterService.state());
ClusterState clusterState = clusterService.state();
if (MlConfigMigrator.jobIsEligibleForMigration(request.getJobId(), clusterState)) {
actionListener.onFailure(ExceptionsHelper.configHasNotBeenMigrated("update job", request.getJobId()));
return;
}

MlMetadata mlMetadata = MlMetadata.getMlMetadata(clusterState);

if (request.getJobUpdate().getGroups() != null && request.getJobUpdate().getGroups().isEmpty() == false) {

Expand Down Expand Up @@ -466,6 +473,7 @@ public void updateJob(UpdateJobAction.Request request, ActionListener<PutJobActi

private void updateJobPostInitialChecks(UpdateJobAction.Request request, MlMetadata mlMetadata,
ActionListener<PutJobAction.Response> actionListener) {

if (ClusterStateJobUpdate.jobIsInMlMetadata(mlMetadata, request.getJobId())) {
updateJobClusterState(request, actionListener);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,38 +139,4 @@ public void testClusterChanged_noPersistentTaskChanges() {
notifier.offMaster();
verify(configMigrator, never()).migrateConfigsWithoutTasks(any(), any());
}

public void testMigrateNotTriggered_GivenPre66Nodes() {
MlAssignmentNotifier notifier = new MlAssignmentNotifier(auditor, threadPool, configMigrator, clusterService);
notifier.onMaster();

ClusterState previous = ClusterState.builder(new ClusterName("_name"))
.build();

PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder();
addJobTask("job_id", null, null, tasksBuilder);
MetaData metaData = MetaData.builder().putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build()).build();

// mixed 6.5 and 6.6 nodes
ClusterState current = ClusterState.builder(new ClusterName("_name"))
.nodes(DiscoveryNodes.builder()
.add(new DiscoveryNode("node_id1", new TransportAddress(InetAddress.getLoopbackAddress(), 9300), Version.V_6_5_0))
.add(new DiscoveryNode("node_id2", new TransportAddress(InetAddress.getLoopbackAddress(), 9301), Version.V_6_6_0)))
.metaData(metaData)
.build();

notifier.clusterChanged(new ClusterChangedEvent("_test", current, previous));
verify(configMigrator, never()).migrateConfigsWithoutTasks(any(), any());

current = ClusterState.builder(new ClusterName("_name"))
.nodes(DiscoveryNodes.builder()
.add(new DiscoveryNode("node_id1", new TransportAddress(InetAddress.getLoopbackAddress(), 9300), Version.V_6_6_0))
.add(new DiscoveryNode("node_id2", new TransportAddress(InetAddress.getLoopbackAddress(), 9301), Version.V_6_6_0)))
.metaData(metaData)
.build();

// all 6.6 nodes
notifier.clusterChanged(new ClusterChangedEvent("_test", current, previous));
verify(configMigrator, times(1)).migrateConfigsWithoutTasks(any(), any());
}
}
Loading

0 comments on commit 2d3de5d

Please sign in to comment.