From cb06e363b1b1ea020126936fcbd6bf3bbffad924 Mon Sep 17 00:00:00 2001 From: David Kyle Date: Fri, 9 Nov 2018 12:20:38 +0000 Subject: [PATCH 1/6] Put finalise job action back in --- .../TransportFinalizeJobExecutionAction.java | 64 ++++++++++++++- ...nsportFinalizeJobExecutionActionTests.java | 79 +++++++++++++++++++ 2 files changed, 140 insertions(+), 3 deletions(-) create mode 100644 x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportFinalizeJobExecutionActionTests.java diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportFinalizeJobExecutionAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportFinalizeJobExecutionAction.java index c9fdd7b18fb53..0263f26933307 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportFinalizeJobExecutionAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportFinalizeJobExecutionAction.java @@ -10,16 +10,27 @@ import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.action.support.master.TransportMasterNodeAction; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateUpdateTask; import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.core.XPackPlugin; +import org.elasticsearch.xpack.core.ml.MlMetadata; import org.elasticsearch.xpack.core.ml.action.FinalizeJobExecutionAction; +import org.elasticsearch.xpack.core.ml.job.config.Job; +import java.util.Arrays; +import java.util.Date; +import java.util.List; +import java.util.stream.Collectors; + +// This action is only called from modes before version 6.6.0 public class TransportFinalizeJobExecutionAction extends TransportMasterNodeAction { @@ -45,9 +56,56 @@ protected AcknowledgedResponse newResponse() { @Override protected void masterOperation(FinalizeJobExecutionAction.Request request, ClusterState state, ActionListener listener) { - // This action is no longer required but needs to be preserved - // in case it is called by an old node in a mixed cluster - listener.onResponse(new AcknowledgedResponse(true)); + + MlMetadata mlMetadata = MlMetadata.getMlMetadata(state); + List jobsInClusterState = Arrays.stream(request.getJobIds()) + .filter(id -> mlMetadata.getJobs().containsKey(id)) + .collect(Collectors.toList()); + + // As this action is only called by pre v6.6.0 nodes that cannot run jobs + // defined in the index this check should not be necessary i.e. the + // job config must be in the clusterstate + + if (jobsInClusterState.isEmpty()) { + // This action is a no-op for jobs not defined in the cluster state. + listener.onResponse(new AcknowledgedResponse(true)); + return; + } + + String jobIdString = String.join(",", jobsInClusterState); + String source = "finalize_job_execution [" + jobIdString + "]"; + logger.debug("finalizing jobs [{}]", jobIdString); + clusterService.submitStateUpdateTask(source, new ClusterStateUpdateTask() { + @Override + public ClusterState execute(ClusterState currentState) { + XPackPlugin.checkReadyForXPackCustomMetadata(currentState); + MlMetadata mlMetadata = MlMetadata.getMlMetadata(currentState); + MlMetadata.Builder mlMetadataBuilder = new MlMetadata.Builder(mlMetadata); + Date finishedTime = new Date(); + + for (String jobId : jobsInClusterState) { + Job.Builder jobBuilder = new Job.Builder(mlMetadata.getJobs().get(jobId)); + jobBuilder.setFinishedTime(finishedTime); + mlMetadataBuilder.putJob(jobBuilder.build(), true); + } + ClusterState.Builder builder = ClusterState.builder(currentState); + return builder.metaData(new MetaData.Builder(currentState.metaData()) + .putCustom(MlMetadata.TYPE, mlMetadataBuilder.build())) + .build(); + } + + @Override + public void onFailure(String source, Exception e) { + listener.onFailure(e); + } + + @Override + public void clusterStateProcessed(String source, ClusterState oldState, + ClusterState newState) { + logger.debug("finalized job [{}]", jobIdString); + listener.onResponse(new AcknowledgedResponse(true)); + } + }); } @Override diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportFinalizeJobExecutionActionTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportFinalizeJobExecutionActionTests.java new file mode 100644 index 0000000000000..d2d69cd0d44ce --- /dev/null +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportFinalizeJobExecutionActionTests.java @@ -0,0 +1,79 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.action; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.core.ml.MlMetadata; +import org.elasticsearch.xpack.core.ml.action.FinalizeJobExecutionAction; +import org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase; + +import java.util.Date; +import java.util.concurrent.atomic.AtomicReference; + +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +public class TransportFinalizeJobExecutionActionTests extends ESTestCase { + + public void testOperation_noJobsInClusterState() { + ClusterService clusterService = mock(ClusterService.class); + TransportFinalizeJobExecutionAction action = createAction(clusterService); + + ClusterState clusterState = ClusterState.builder(new ClusterName("finalize-job-action-tests")).build(); + + FinalizeJobExecutionAction.Request request = new FinalizeJobExecutionAction.Request(new String[]{"index-job1", "index-job2"}); + AtomicReference ack = new AtomicReference<>(); + action.masterOperation(request, clusterState, ActionListener.wrap( + ack::set, + e -> fail(e.getMessage()) + )); + + assertTrue(ack.get().isAcknowledged()); + verify(clusterService, never()).submitStateUpdateTask(any(), any()); + } + + public void testOperation_jobInClusterState() { + ClusterService clusterService = mock(ClusterService.class); + TransportFinalizeJobExecutionAction action = createAction(clusterService); + + MlMetadata.Builder mlBuilder = new MlMetadata.Builder(); + mlBuilder.putJob(BaseMlIntegTestCase.createFareQuoteJob("cs-job").build(new Date()), false); + + ClusterState clusterState = ClusterState.builder(new ClusterName("finalize-job-action-tests")) + .metaData(MetaData.builder() + .putCustom(MlMetadata.TYPE, mlBuilder.build())) + .build(); + + FinalizeJobExecutionAction.Request request = new FinalizeJobExecutionAction.Request(new String[]{"cs-job"}); + AtomicReference ack = new AtomicReference<>(); + action.masterOperation(request, clusterState, ActionListener.wrap( + ack::set, + e -> fail(e.getMessage()) + )); + + verify(clusterService, times(1)).submitStateUpdateTask(any(), any()); + } + + private TransportFinalizeJobExecutionAction createAction(ClusterService clusterService) { + return new TransportFinalizeJobExecutionAction(Settings.EMPTY, mock(TransportService.class), clusterService, + mock(ThreadPool.class), mock(ActionFilters.class), mock(IndexNameExpressionResolver.class)); + + } +} From 1bb6d5615cd6103e53238fe8fd037243416222c5 Mon Sep 17 00:00:00 2001 From: David Kyle Date: Fri, 9 Nov 2018 14:22:44 +0000 Subject: [PATCH 2/6] Job updates can apply to cluster state or index jobs --- .../xpack/core/ml/action/UpdateJobAction.java | 21 ++- .../xpack/ml/job/ClusterStateJobUpdate.java | 38 +++++ .../xpack/ml/job/JobManager.java | 133 +++++++++++++----- 3 files changed, 151 insertions(+), 41 deletions(-) create mode 100644 x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/ClusterStateJobUpdate.java diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/UpdateJobAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/UpdateJobAction.java index c3bb4b7e0498d..1fb387b0b6c2a 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/UpdateJobAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/UpdateJobAction.java @@ -54,6 +54,7 @@ public static UpdateJobAction.Request parseRequest(String jobId, XContentParser /** Indicates an update that was not triggered by a user */ private boolean isInternal; + private boolean waitForAck = true; public Request(String jobId, JobUpdate update) { this(jobId, update, false); @@ -87,6 +88,14 @@ public boolean isInternal() { return isInternal; } + public boolean isWaitForAck() { + return waitForAck; + } + + public void setWaitForAck(boolean waitForAck) { + this.waitForAck = waitForAck; + } + @Override public ActionRequestValidationException validate() { return null; @@ -102,9 +111,10 @@ public void readFrom(StreamInput in) throws IOException { } else { isInternal = false; } - // TODO jindex change CURRENT to specific version when feature branch is merged - if (in.getVersion().onOrAfter(Version.V_6_3_0) && in.getVersion().before(Version.CURRENT)) { - in.readBoolean(); // was waitForAck + if (in.getVersion().onOrAfter(Version.V_6_3_0)) { + waitForAck = in.readBoolean(); + } else { + waitForAck = true; } } @@ -116,9 +126,8 @@ public void writeTo(StreamOutput out) throws IOException { if (out.getVersion().onOrAfter(Version.V_6_2_2)) { out.writeBoolean(isInternal); } - // TODO jindex change CURRENT to specific version when feature branch is merged - if (out.getVersion().onOrAfter(Version.V_6_3_0) && out.getVersion().before(Version.CURRENT)) { - out.writeBoolean(false); // was waitForAck + if (out.getVersion().onOrAfter(Version.V_6_3_0)) { + out.writeBoolean(waitForAck); } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/ClusterStateJobUpdate.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/ClusterStateJobUpdate.java new file mode 100644 index 0000000000000..a0adfd22a07c5 --- /dev/null +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/ClusterStateJobUpdate.java @@ -0,0 +1,38 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.job; + +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.xpack.core.XPackPlugin; +import org.elasticsearch.xpack.core.ml.MlMetadata; +import org.elasticsearch.xpack.core.ml.job.config.Job; + +/** + * Helper functions for managing cluster state job configurations + */ +public final class ClusterStateJobUpdate { + + private ClusterStateJobUpdate() { + } + + public static ClusterState updateClusterState(Job job, boolean overwrite, ClusterState currentState) { + MlMetadata.Builder builder = createMlMetadataBuilder(currentState); + builder.putJob(job, overwrite); + return buildNewClusterState(currentState, builder); + } + + private static MlMetadata.Builder createMlMetadataBuilder(ClusterState currentState) { + return new MlMetadata.Builder(MlMetadata.getMlMetadata(currentState)); + } + + private static ClusterState buildNewClusterState(ClusterState currentState, MlMetadata.Builder builder) { + XPackPlugin.checkReadyForXPackCustomMetadata(currentState); + ClusterState.Builder newState = ClusterState.builder(currentState); + newState.metaData(MetaData.builder(currentState.getMetaData()).putCustom(MlMetadata.TYPE, builder.build()).build()); + return newState.build(); + } +} diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java index 47880f840effe..f0bd8a7232e43 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java @@ -13,7 +13,9 @@ import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.AckedClusterStateUpdateTask; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateUpdateTask; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.CheckedConsumer; @@ -71,6 +73,7 @@ import java.util.Set; import java.util.SortedSet; import java.util.TreeSet; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -415,50 +418,110 @@ public void onFailure(Exception e) { } public void updateJob(UpdateJobAction.Request request, ActionListener actionListener) { - - ActionListener postUpdateAction; - - // Autodetect must be updated if the fields that the C++ uses are changed - if (request.getJobUpdate().isAutodetectProcessUpdate()) { - postUpdateAction = ActionListener.wrap( + MlMetadata mlMetadata = MlMetadata.getMlMetadata(clusterService.state()); + if (mlMetadata.getJobs().containsKey(request.getJobId())) { + updateJobClusterState(request, actionListener); + } else { + updateJobIndex(request, ActionListener.wrap( updatedJob -> { - JobUpdate jobUpdate = request.getJobUpdate(); - if (isJobOpen(clusterService.state(), request.getJobId())) { - updateJobProcessNotifier.submitJobUpdate(UpdateParams.fromJobUpdate(jobUpdate), ActionListener.wrap( - isUpdated -> { - if (isUpdated) { - auditJobUpdatedIfNotInternal(request); - } - }, e -> { - // No need to do anything - } - )); - } + postJobUpdate(clusterService.state(), request); actionListener.onResponse(new PutJobAction.Response(updatedJob)); }, actionListener::onFailure - ); + )); + } + } + + private void postJobUpdate(ClusterState clusterState, UpdateJobAction.Request request) { + JobUpdate jobUpdate = request.getJobUpdate(); + + // Change is required if the fields that the C++ uses are being updated + boolean processUpdateRequired = jobUpdate.isAutodetectProcessUpdate(); + + if (processUpdateRequired && isJobOpen(clusterState, request.getJobId())) { + updateJobProcessNotifier.submitJobUpdate(UpdateParams.fromJobUpdate(jobUpdate), ActionListener.wrap( + isUpdated -> { + if (isUpdated) { + auditJobUpdatedIfNotInternal(request); + } + }, e -> { + // No need to do anything + } + )); } else { - postUpdateAction = ActionListener.wrap(job -> { - logger.debug("[{}] No process update required for job update: {}", () -> request.getJobId(), () -> { - try { - XContentBuilder jsonBuilder = XContentFactory.jsonBuilder(); - request.getJobUpdate().toXContent(jsonBuilder, ToXContent.EMPTY_PARAMS); - return Strings.toString(jsonBuilder); - } catch (IOException e) { - return "(unprintable due to " + e.getMessage() + ")"; - } - }); + logger.debug("[{}] No process update required for job update: {}", () -> request.getJobId(), () -> { + try { + XContentBuilder jsonBuilder = XContentFactory.jsonBuilder(); + jobUpdate.toXContent(jsonBuilder, ToXContent.EMPTY_PARAMS); + return Strings.toString(jsonBuilder); + } catch (IOException e) { + return "(unprintable due to " + e.getMessage() + ")"; + } + }); - auditJobUpdatedIfNotInternal(request); - actionListener.onResponse(new PutJobAction.Response(job)); - }, - actionListener::onFailure); + auditJobUpdatedIfNotInternal(request); } + } - + private void updateJobIndex(UpdateJobAction.Request request, ActionListener updatedJobListener) { jobConfigProvider.updateJobWithValidation(request.getJobId(), request.getJobUpdate(), maxModelMemoryLimit, - this::validate, postUpdateAction); + this::validate, updatedJobListener); + } + + private void updateJobClusterState(UpdateJobAction.Request request, ActionListener actionListener) { + Job job = MlMetadata.getMlMetadata(clusterService.state()).getJobs().get(request.getJobId()); + validate(job, request.getJobUpdate(), ActionListener.wrap( + nullValue -> clusterStateJobUpdate(request, actionListener), + actionListener::onFailure)); + } + + private void clusterStateJobUpdate(UpdateJobAction.Request request, ActionListener actionListener) { + if (request.isWaitForAck()) { + // Use the ack cluster state update + clusterService.submitStateUpdateTask("update-job-" + request.getJobId(), + new AckedClusterStateUpdateTask(request, actionListener) { + private AtomicReference updatedJob = new AtomicReference<>(); + + @Override + protected PutJobAction.Response newResponse(boolean acknowledged) { + return new PutJobAction.Response(updatedJob.get()); + } + + @Override + public ClusterState execute(ClusterState currentState) { + Job job = MlMetadata.getMlMetadata(clusterService.state()).getJobs().get(request.getJobId()); + updatedJob.set(request.getJobUpdate().mergeWithJob(job, maxModelMemoryLimit)); + return ClusterStateJobUpdate.updateClusterState(updatedJob.get(), true, currentState); + } + + @Override + public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { + postJobUpdate(newState, request); + } + }); + } else { + clusterService.submitStateUpdateTask("update-job-" + request.getJobId(), new ClusterStateUpdateTask() { + private AtomicReference updatedJob = new AtomicReference<>(); + + @Override + public ClusterState execute(ClusterState currentState) throws Exception { + Job job = MlMetadata.getMlMetadata(clusterService.state()).getJobs().get(request.getJobId()); + updatedJob.set(request.getJobUpdate().mergeWithJob(job, maxModelMemoryLimit)); + return ClusterStateJobUpdate.updateClusterState(updatedJob.get(), true, currentState); + } + + @Override + public void onFailure(String source, Exception e) { + actionListener.onFailure(e); + } + + @Override + public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { + postJobUpdate(newState, request); + actionListener.onResponse(new PutJobAction.Response(updatedJob.get())); + } + }); + } } private void validate(Job job, JobUpdate jobUpdate, ActionListener handler) { From 48e726154ecee6b68408e9734e241d1ca565b9fd Mon Sep 17 00:00:00 2001 From: David Kyle Date: Fri, 9 Nov 2018 15:27:34 +0000 Subject: [PATCH 3/6] Put back cluster state updates in job manager For revert model snapshot, filter changed and calendar changed --- .../xpack/ml/job/ClusterStateJobUpdate.java | 9 ++ .../xpack/ml/job/JobManager.java | 97 +++++++++++++++---- 2 files changed, 86 insertions(+), 20 deletions(-) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/ClusterStateJobUpdate.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/ClusterStateJobUpdate.java index a0adfd22a07c5..947aa641a11ee 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/ClusterStateJobUpdate.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/ClusterStateJobUpdate.java @@ -19,6 +19,15 @@ public final class ClusterStateJobUpdate { private ClusterStateJobUpdate() { } + public static boolean jobIsInClusterState(ClusterState clusterState, String jobId) { + MlMetadata mlMetadata = MlMetadata.getMlMetadata(clusterState); + return mlMetadata.getJobs().containsKey(jobId); + } + + public static boolean jobIsInClusterState(MlMetadata mlMetadata, String jobId) { + return mlMetadata.getJobs().containsKey(jobId); + } + public static ClusterState updateClusterState(Job job, boolean overwrite, ClusterState currentState) { MlMetadata.Builder builder = createMlMetadataBuilder(currentState); builder.putJob(job, overwrite); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java index f0bd8a7232e43..999f20f3210ef 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java @@ -334,7 +334,7 @@ public void putJob(PutJobAction.Request request, AnalysisRegistry analysisRegist // Check for the job in the cluster state first MlMetadata currentMlMetadata = MlMetadata.getMlMetadata(state); - if (currentMlMetadata.getJobs().containsKey(job.getId())) { + if (ClusterStateJobUpdate.jobIsInClusterState(currentMlMetadata, job.getId())) { actionListener.onFailure(ExceptionsHelper.jobAlreadyExists(job.getId())); return; } @@ -419,7 +419,7 @@ public void onFailure(Exception e) { public void updateJob(UpdateJobAction.Request request, ActionListener actionListener) { MlMetadata mlMetadata = MlMetadata.getMlMetadata(clusterService.state()); - if (mlMetadata.getJobs().containsKey(request.getJobId())) { + if (ClusterStateJobUpdate.jobIsInClusterState(mlMetadata, request.getJobId())) { updateJobClusterState(request, actionListener); } else { updateJobIndex(request, ActionListener.wrap( @@ -607,10 +607,25 @@ public void notifyFilterChanged(MlFilter filter, Set addedItems, Set clusterStateJobs = expandJobsFromClusterState(MetaData.ALL, true, clusterService.state()); + jobConfigProvider.findJobsWithCustomRules(ActionListener.wrap( - jobBuilders -> { + indexJobs -> { threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME).execute(() -> { - for (Job job: jobBuilders) { + + List allJobs = new ArrayList<>(); + // Check for duplicate jobs + for (Job indexJob : indexJobs) { + if (clusterStateJobs.containsKey(indexJob.getId())) { + logger.error("[" + indexJob.getId() + "] job configuration exists in both clusterstate and index"); + } else { + allJobs.add(indexJob); + } + } + allJobs.addAll(clusterStateJobs.values()); + + for (Job job: allJobs) { Set jobFilters = job.getAnalysisConfig().extractReferencedFilters(); ClusterState clusterState = clusterService.state(); if (jobFilters.contains(filter.getId())) { @@ -668,7 +683,16 @@ public void updateProcessOnCalendarChanged(List calendarJobIds, ActionLi return; } - // calendarJobIds may be a group or job + // Get the cluster state jobs that match + MlMetadata mlMetadata = MlMetadata.getMlMetadata(clusterState); + List existingJobsOrGroups = + calendarJobIds.stream().filter(mlMetadata::isGroupOrJob).collect(Collectors.toList()); + + Set clusterStateIds = new HashSet<>(); + existingJobsOrGroups.forEach(jobId -> clusterStateIds.addAll(mlMetadata.expandJobIds(jobId, true))); + + // calendarJobIds may be a group or job. + // Expand the groups to the constituent job ids jobConfigProvider.expandGroupIds(calendarJobIds, ActionListener.wrap( expandedIds -> { threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME).execute(() -> { @@ -676,6 +700,9 @@ public void updateProcessOnCalendarChanged(List calendarJobIds, ActionLi // which are job ids rather than group Ids. expandedIds.addAll(calendarJobIds); + // Merge in the cluster state job ids + expandedIds.addAll(clusterStateIds); + for (String jobId : expandedIds) { if (isJobOpen(clusterState, jobId)) { updateJobProcessNotifier.submitJobUpdate(UpdateParams.scheduledEventsUpdate(jobId), ActionListener.wrap( @@ -727,21 +754,51 @@ public void revertSnapshot(RevertModelSnapshotAction.Request request, ActionList // Step 1. update the job // ------- - Consumer updateJobHandler = response -> { - JobUpdate update = new JobUpdate.Builder(request.getJobId()) - .setModelSnapshotId(modelSnapshot.getSnapshotId()) - .setEstablishedModelMemory(response) - .build(); - - jobConfigProvider.updateJob(request.getJobId(), update, maxModelMemoryLimit, ActionListener.wrap( - job -> { - auditor.info(request.getJobId(), - Messages.getMessage(Messages.JOB_AUDIT_REVERTED, modelSnapshot.getDescription())); - updateHandler.accept(Boolean.TRUE); - }, - actionListener::onFailure - )); - }; + + Consumer updateJobHandler; + + if (ClusterStateJobUpdate.jobIsInClusterState(clusterService.state(), request.getJobId())) { + updateJobHandler = response -> clusterService.submitStateUpdateTask("revert-snapshot-" + request.getJobId(), + new AckedClusterStateUpdateTask(request, ActionListener.wrap(updateHandler, actionListener::onFailure)) { + + @Override + protected Boolean newResponse(boolean acknowledged) { + if (acknowledged) { + auditor.info(request.getJobId(), + Messages.getMessage(Messages.JOB_AUDIT_REVERTED, modelSnapshot.getDescription())); + return true; + } + actionListener.onFailure(new IllegalStateException("Could not revert modelSnapshot on job [" + + request.getJobId() + "], not acknowledged by master.")); + return false; + } + + @Override + public ClusterState execute(ClusterState currentState) { + Job job = MlMetadata.getMlMetadata(currentState).getJobs().get(request.getJobId()); + Job.Builder builder = new Job.Builder(job); + builder.setModelSnapshotId(modelSnapshot.getSnapshotId()); + builder.setEstablishedModelMemory(response); + return ClusterStateJobUpdate.updateClusterState(builder.build(), true, currentState); + } + }); + } else { + updateJobHandler = response -> { + JobUpdate update = new JobUpdate.Builder(request.getJobId()) + .setModelSnapshotId(modelSnapshot.getSnapshotId()) + .setEstablishedModelMemory(response) + .build(); + + jobConfigProvider.updateJob(request.getJobId(), update, maxModelMemoryLimit, ActionListener.wrap( + job -> { + auditor.info(request.getJobId(), + Messages.getMessage(Messages.JOB_AUDIT_REVERTED, modelSnapshot.getDescription())); + updateHandler.accept(Boolean.TRUE); + }, + actionListener::onFailure + )); + }; + } // Step 0. Find the appropriate established model memory for the reverted job // ------- From 40def8c3b053aaf3b743d5fe3628ac02816138ad Mon Sep 17 00:00:00 2001 From: David Kyle Date: Wed, 14 Nov 2018 09:50:50 +0000 Subject: [PATCH 4/6] test --- .../xpack/ml/job/ClusterStateJobUpdate.java | 4 +- .../xpack/ml/job/JobManager.java | 14 +++--- .../xpack/ml/job/JobManagerTests.java | 44 ++++++++++++++++++- 3 files changed, 52 insertions(+), 10 deletions(-) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/ClusterStateJobUpdate.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/ClusterStateJobUpdate.java index 947aa641a11ee..19f32730bc31d 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/ClusterStateJobUpdate.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/ClusterStateJobUpdate.java @@ -24,11 +24,11 @@ public static boolean jobIsInClusterState(ClusterState clusterState, String jobI return mlMetadata.getJobs().containsKey(jobId); } - public static boolean jobIsInClusterState(MlMetadata mlMetadata, String jobId) { + public static boolean jobIsInMlMetadata(MlMetadata mlMetadata, String jobId) { return mlMetadata.getJobs().containsKey(jobId); } - public static ClusterState updateClusterState(Job job, boolean overwrite, ClusterState currentState) { + public static ClusterState putJobInClusterState(Job job, boolean overwrite, ClusterState currentState) { MlMetadata.Builder builder = createMlMetadataBuilder(currentState); builder.putJob(job, overwrite); return buildNewClusterState(currentState, builder); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java index 999f20f3210ef..3419fb7cb8269 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java @@ -334,7 +334,7 @@ public void putJob(PutJobAction.Request request, AnalysisRegistry analysisRegist // Check for the job in the cluster state first MlMetadata currentMlMetadata = MlMetadata.getMlMetadata(state); - if (ClusterStateJobUpdate.jobIsInClusterState(currentMlMetadata, job.getId())) { + if (ClusterStateJobUpdate.jobIsInMlMetadata(currentMlMetadata, job.getId())) { actionListener.onFailure(ExceptionsHelper.jobAlreadyExists(job.getId())); return; } @@ -419,7 +419,7 @@ public void onFailure(Exception e) { public void updateJob(UpdateJobAction.Request request, ActionListener actionListener) { MlMetadata mlMetadata = MlMetadata.getMlMetadata(clusterService.state()); - if (ClusterStateJobUpdate.jobIsInClusterState(mlMetadata, request.getJobId())) { + if (ClusterStateJobUpdate.jobIsInMlMetadata(mlMetadata, request.getJobId())) { updateJobClusterState(request, actionListener); } else { updateJobIndex(request, ActionListener.wrap( @@ -491,7 +491,7 @@ protected PutJobAction.Response newResponse(boolean acknowledged) { public ClusterState execute(ClusterState currentState) { Job job = MlMetadata.getMlMetadata(clusterService.state()).getJobs().get(request.getJobId()); updatedJob.set(request.getJobUpdate().mergeWithJob(job, maxModelMemoryLimit)); - return ClusterStateJobUpdate.updateClusterState(updatedJob.get(), true, currentState); + return ClusterStateJobUpdate.putJobInClusterState(updatedJob.get(), true, currentState); } @Override @@ -507,7 +507,7 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS public ClusterState execute(ClusterState currentState) throws Exception { Job job = MlMetadata.getMlMetadata(clusterService.state()).getJobs().get(request.getJobId()); updatedJob.set(request.getJobUpdate().mergeWithJob(job, maxModelMemoryLimit)); - return ClusterStateJobUpdate.updateClusterState(updatedJob.get(), true, currentState); + return ClusterStateJobUpdate.putJobInClusterState(updatedJob.get(), true, currentState); } @Override @@ -608,7 +608,7 @@ public void notifyFilterChanged(MlFilter filter, Set addedItems, Set clusterStateJobs = expandJobsFromClusterState(MetaData.ALL, true, clusterService.state()); + Map clusterStateJobs = expandJobsFromClusterState(MetaData.ALL, clusterService.state()); jobConfigProvider.findJobsWithCustomRules(ActionListener.wrap( indexJobs -> { @@ -689,7 +689,7 @@ public void updateProcessOnCalendarChanged(List calendarJobIds, ActionLi calendarJobIds.stream().filter(mlMetadata::isGroupOrJob).collect(Collectors.toList()); Set clusterStateIds = new HashSet<>(); - existingJobsOrGroups.forEach(jobId -> clusterStateIds.addAll(mlMetadata.expandJobIds(jobId, true))); + existingJobsOrGroups.forEach(jobId -> clusterStateIds.addAll(mlMetadata.expandJobIds(jobId))); // calendarJobIds may be a group or job. // Expand the groups to the constituent job ids @@ -779,7 +779,7 @@ public ClusterState execute(ClusterState currentState) { Job.Builder builder = new Job.Builder(job); builder.setModelSnapshotId(modelSnapshot.getSnapshotId()); builder.setEstablishedModelMemory(response); - return ClusterStateJobUpdate.updateClusterState(builder.build(), true, currentState); + return ClusterStateJobUpdate.putJobInClusterState(builder.build(), true, currentState); } }); } else { diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/JobManagerTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/JobManagerTests.java index 41907d52dd048..654edff394db4 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/JobManagerTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/JobManagerTests.java @@ -31,6 +31,7 @@ import org.elasticsearch.xpack.core.ml.MachineLearningField; import org.elasticsearch.xpack.core.ml.MlMetadata; import org.elasticsearch.xpack.core.ml.action.PutJobAction; +import org.elasticsearch.xpack.core.ml.action.RevertModelSnapshotAction; import org.elasticsearch.xpack.core.ml.action.util.QueryPage; import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig; import org.elasticsearch.xpack.core.ml.job.config.DataDescription; @@ -41,6 +42,8 @@ import org.elasticsearch.xpack.core.ml.job.config.MlFilter; import org.elasticsearch.xpack.core.ml.job.config.RuleScope; import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex; +import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSizeStats; +import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot; import org.elasticsearch.xpack.ml.MachineLearning; import org.elasticsearch.xpack.ml.job.categorization.CategorizationAnalyzerTests; import org.elasticsearch.xpack.ml.job.persistence.JobConfigProvider; @@ -66,6 +69,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; import java.util.stream.Collectors; import static org.elasticsearch.xpack.core.ml.job.config.JobTests.buildJobBuilder; @@ -80,8 +84,10 @@ import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyBoolean; import static org.mockito.Matchers.anyString; +import static org.mockito.Matchers.eq; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -495,7 +501,7 @@ public void testPutJob_AddsCreateTime() throws IOException { AckedClusterStateUpdateTask task = (AckedClusterStateUpdateTask) invocation.getArguments()[1]; task.onAllNodesAcked(null); return null; - }).when(clusterService).submitStateUpdateTask(Matchers.eq("put-job-foo"), any(AckedClusterStateUpdateTask.class)); + }).when(clusterService).submitStateUpdateTask(eq("put-job-foo"), any(AckedClusterStateUpdateTask.class)); ArgumentCaptor requestCaptor = ArgumentCaptor.forClass(Job.class); doAnswer(invocation -> { @@ -850,6 +856,42 @@ public void testUpdateProcessOnCalendarChanged_GivenGroups() throws IOException assertThat(capturedUpdateParams.get(1).isUpdateScheduledEvents(), is(true)); } + public void testRevertSnapshot_GivenJobInClusterState() { + MlMetadata.Builder mlMetadata = new MlMetadata.Builder(); + mlMetadata.putJob(buildJobBuilder("cs-revert").build(), false); + + ClusterState clusterState = ClusterState.builder(new ClusterName("_name")) + .metaData(MetaData.builder() + .putCustom(MlMetadata.TYPE, mlMetadata.build())) + .build(); + when(clusterService.state()).thenReturn(clusterState); + + ClusterSettings clusterSettings = new ClusterSettings(environment.settings(), + Collections.singleton(MachineLearningField.MAX_MODEL_MEMORY_LIMIT)); + when(clusterService.getClusterSettings()).thenReturn(clusterSettings); + + JobConfigProvider jobConfigProvider = mock(JobConfigProvider.class); + JobManager jobManager = new JobManager(environment, environment.settings(), jobResultsProvider, clusterService, + auditor, threadPool, mock(Client.class), updateJobProcessNotifier, jobConfigProvider); + + RevertModelSnapshotAction.Request request = new RevertModelSnapshotAction.Request("cs-revert", "ms-1"); + + ModelSnapshot modelSnapshot = mock(ModelSnapshot.class); + ModelSizeStats modelSizeStats = mock(ModelSizeStats.class); + when(modelSnapshot.getModelSizeStats()).thenReturn(modelSizeStats); + + + doAnswer(invocation -> { + Consumer listener = (Consumer) invocation.getArguments()[3]; + listener.accept(100L); + return null; + }).when(jobResultsProvider).getEstablishedMemoryUsage(eq("cs-revert"), any(), any(), any(), any()); + + jobManager.revertSnapshot(request, mock(ActionListener.class), modelSnapshot); + verify(clusterService, times(1)).submitStateUpdateTask(eq("revert-snapshot-cs-revert"), any(AckedClusterStateUpdateTask.class)); + verify(jobConfigProvider, never()).updateJob(any(), any(), any(), any()); + } + private Job.Builder createJob() { Detector.Builder d1 = new Detector.Builder("info_content", "domain"); d1.setOverFieldName("client"); From 2fb4e96e9c2fcd5128921625eb1bf9f47addf4e9 Mon Sep 17 00:00:00 2001 From: David Kyle Date: Wed, 14 Nov 2018 12:45:04 +0000 Subject: [PATCH 5/6] Update misleading comment --- .../xpack/ml/action/TransportFinalizeJobExecutionAction.java | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportFinalizeJobExecutionAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportFinalizeJobExecutionAction.java index 0263f26933307..5562273aed181 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportFinalizeJobExecutionAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportFinalizeJobExecutionAction.java @@ -62,9 +62,8 @@ protected void masterOperation(FinalizeJobExecutionAction.Request request, Clust .filter(id -> mlMetadata.getJobs().containsKey(id)) .collect(Collectors.toList()); - // As this action is only called by pre v6.6.0 nodes that cannot run jobs - // defined in the index this check should not be necessary i.e. the - // job config must be in the clusterstate + // This action should not be called for jobs that have + // their configuration in index documents if (jobsInClusterState.isEmpty()) { // This action is a no-op for jobs not defined in the cluster state. From 307b46b5cf9dfd2a8d89959f54dfbc9e059be44f Mon Sep 17 00:00:00 2001 From: David Kyle Date: Wed, 14 Nov 2018 13:20:11 +0000 Subject: [PATCH 6/6] Fix checkstyle --- .../xpack/ml/action/TransportFinalizeJobExecutionAction.java | 2 +- .../java/org/elasticsearch/xpack/ml/job/JobManagerTests.java | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportFinalizeJobExecutionAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportFinalizeJobExecutionAction.java index 5562273aed181..6a9dcd608208f 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportFinalizeJobExecutionAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportFinalizeJobExecutionAction.java @@ -63,7 +63,7 @@ protected void masterOperation(FinalizeJobExecutionAction.Request request, Clust .collect(Collectors.toList()); // This action should not be called for jobs that have - // their configuration in index documents + // their configuration in index documents if (jobsInClusterState.isEmpty()) { // This action is a no-op for jobs not defined in the cluster state. diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/JobManagerTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/JobManagerTests.java index 654edff394db4..4aba53d2614cb 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/JobManagerTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/JobManagerTests.java @@ -53,7 +53,6 @@ import org.elasticsearch.xpack.ml.notifications.Auditor; import org.junit.Before; import org.mockito.ArgumentCaptor; -import org.mockito.Matchers; import org.mockito.Mockito; import java.io.IOException;