diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/UpdateJobAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/UpdateJobAction.java index c3bb4b7e0498d..1fb387b0b6c2a 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/UpdateJobAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/UpdateJobAction.java @@ -54,6 +54,7 @@ public static UpdateJobAction.Request parseRequest(String jobId, XContentParser /** Indicates an update that was not triggered by a user */ private boolean isInternal; + private boolean waitForAck = true; public Request(String jobId, JobUpdate update) { this(jobId, update, false); @@ -87,6 +88,14 @@ public boolean isInternal() { return isInternal; } + public boolean isWaitForAck() { + return waitForAck; + } + + public void setWaitForAck(boolean waitForAck) { + this.waitForAck = waitForAck; + } + @Override public ActionRequestValidationException validate() { return null; @@ -102,9 +111,10 @@ public void readFrom(StreamInput in) throws IOException { } else { isInternal = false; } - // TODO jindex change CURRENT to specific version when feature branch is merged - if (in.getVersion().onOrAfter(Version.V_6_3_0) && in.getVersion().before(Version.CURRENT)) { - in.readBoolean(); // was waitForAck + if (in.getVersion().onOrAfter(Version.V_6_3_0)) { + waitForAck = in.readBoolean(); + } else { + waitForAck = true; } } @@ -116,9 +126,8 @@ public void writeTo(StreamOutput out) throws IOException { if (out.getVersion().onOrAfter(Version.V_6_2_2)) { out.writeBoolean(isInternal); } - // TODO jindex change CURRENT to specific version when feature branch is merged - if (out.getVersion().onOrAfter(Version.V_6_3_0) && out.getVersion().before(Version.CURRENT)) { - out.writeBoolean(false); // was waitForAck + if (out.getVersion().onOrAfter(Version.V_6_3_0)) { + out.writeBoolean(waitForAck); } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportFinalizeJobExecutionAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportFinalizeJobExecutionAction.java index c9fdd7b18fb53..6a9dcd608208f 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportFinalizeJobExecutionAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportFinalizeJobExecutionAction.java @@ -10,16 +10,27 @@ import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.action.support.master.TransportMasterNodeAction; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateUpdateTask; import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.core.XPackPlugin; +import org.elasticsearch.xpack.core.ml.MlMetadata; import org.elasticsearch.xpack.core.ml.action.FinalizeJobExecutionAction; +import org.elasticsearch.xpack.core.ml.job.config.Job; +import java.util.Arrays; +import java.util.Date; +import java.util.List; +import java.util.stream.Collectors; + +// This action is only called from modes before version 6.6.0 public class TransportFinalizeJobExecutionAction extends TransportMasterNodeAction { @@ -45,9 +56,55 @@ protected AcknowledgedResponse newResponse() { @Override protected void masterOperation(FinalizeJobExecutionAction.Request request, ClusterState state, ActionListener listener) { - // This action is no longer required but needs to be preserved - // in case it is called by an old node in a mixed cluster - listener.onResponse(new AcknowledgedResponse(true)); + + MlMetadata mlMetadata = MlMetadata.getMlMetadata(state); + List jobsInClusterState = Arrays.stream(request.getJobIds()) + .filter(id -> mlMetadata.getJobs().containsKey(id)) + .collect(Collectors.toList()); + + // This action should not be called for jobs that have + // their configuration in index documents + + if (jobsInClusterState.isEmpty()) { + // This action is a no-op for jobs not defined in the cluster state. + listener.onResponse(new AcknowledgedResponse(true)); + return; + } + + String jobIdString = String.join(",", jobsInClusterState); + String source = "finalize_job_execution [" + jobIdString + "]"; + logger.debug("finalizing jobs [{}]", jobIdString); + clusterService.submitStateUpdateTask(source, new ClusterStateUpdateTask() { + @Override + public ClusterState execute(ClusterState currentState) { + XPackPlugin.checkReadyForXPackCustomMetadata(currentState); + MlMetadata mlMetadata = MlMetadata.getMlMetadata(currentState); + MlMetadata.Builder mlMetadataBuilder = new MlMetadata.Builder(mlMetadata); + Date finishedTime = new Date(); + + for (String jobId : jobsInClusterState) { + Job.Builder jobBuilder = new Job.Builder(mlMetadata.getJobs().get(jobId)); + jobBuilder.setFinishedTime(finishedTime); + mlMetadataBuilder.putJob(jobBuilder.build(), true); + } + ClusterState.Builder builder = ClusterState.builder(currentState); + return builder.metaData(new MetaData.Builder(currentState.metaData()) + .putCustom(MlMetadata.TYPE, mlMetadataBuilder.build())) + .build(); + } + + @Override + public void onFailure(String source, Exception e) { + listener.onFailure(e); + } + + @Override + public void clusterStateProcessed(String source, ClusterState oldState, + ClusterState newState) { + logger.debug("finalized job [{}]", jobIdString); + listener.onResponse(new AcknowledgedResponse(true)); + } + }); } @Override diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/ClusterStateJobUpdate.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/ClusterStateJobUpdate.java new file mode 100644 index 0000000000000..19f32730bc31d --- /dev/null +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/ClusterStateJobUpdate.java @@ -0,0 +1,47 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.job; + +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.xpack.core.XPackPlugin; +import org.elasticsearch.xpack.core.ml.MlMetadata; +import org.elasticsearch.xpack.core.ml.job.config.Job; + +/** + * Helper functions for managing cluster state job configurations + */ +public final class ClusterStateJobUpdate { + + private ClusterStateJobUpdate() { + } + + public static boolean jobIsInClusterState(ClusterState clusterState, String jobId) { + MlMetadata mlMetadata = MlMetadata.getMlMetadata(clusterState); + return mlMetadata.getJobs().containsKey(jobId); + } + + public static boolean jobIsInMlMetadata(MlMetadata mlMetadata, String jobId) { + return mlMetadata.getJobs().containsKey(jobId); + } + + public static ClusterState putJobInClusterState(Job job, boolean overwrite, ClusterState currentState) { + MlMetadata.Builder builder = createMlMetadataBuilder(currentState); + builder.putJob(job, overwrite); + return buildNewClusterState(currentState, builder); + } + + private static MlMetadata.Builder createMlMetadataBuilder(ClusterState currentState) { + return new MlMetadata.Builder(MlMetadata.getMlMetadata(currentState)); + } + + private static ClusterState buildNewClusterState(ClusterState currentState, MlMetadata.Builder builder) { + XPackPlugin.checkReadyForXPackCustomMetadata(currentState); + ClusterState.Builder newState = ClusterState.builder(currentState); + newState.metaData(MetaData.builder(currentState.getMetaData()).putCustom(MlMetadata.TYPE, builder.build()).build()); + return newState.build(); + } +} diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java index 47880f840effe..3419fb7cb8269 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java @@ -13,7 +13,9 @@ import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.AckedClusterStateUpdateTask; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateUpdateTask; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.CheckedConsumer; @@ -71,6 +73,7 @@ import java.util.Set; import java.util.SortedSet; import java.util.TreeSet; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -331,7 +334,7 @@ public void putJob(PutJobAction.Request request, AnalysisRegistry analysisRegist // Check for the job in the cluster state first MlMetadata currentMlMetadata = MlMetadata.getMlMetadata(state); - if (currentMlMetadata.getJobs().containsKey(job.getId())) { + if (ClusterStateJobUpdate.jobIsInMlMetadata(currentMlMetadata, job.getId())) { actionListener.onFailure(ExceptionsHelper.jobAlreadyExists(job.getId())); return; } @@ -415,50 +418,110 @@ public void onFailure(Exception e) { } public void updateJob(UpdateJobAction.Request request, ActionListener actionListener) { - - ActionListener postUpdateAction; - - // Autodetect must be updated if the fields that the C++ uses are changed - if (request.getJobUpdate().isAutodetectProcessUpdate()) { - postUpdateAction = ActionListener.wrap( + MlMetadata mlMetadata = MlMetadata.getMlMetadata(clusterService.state()); + if (ClusterStateJobUpdate.jobIsInMlMetadata(mlMetadata, request.getJobId())) { + updateJobClusterState(request, actionListener); + } else { + updateJobIndex(request, ActionListener.wrap( updatedJob -> { - JobUpdate jobUpdate = request.getJobUpdate(); - if (isJobOpen(clusterService.state(), request.getJobId())) { - updateJobProcessNotifier.submitJobUpdate(UpdateParams.fromJobUpdate(jobUpdate), ActionListener.wrap( - isUpdated -> { - if (isUpdated) { - auditJobUpdatedIfNotInternal(request); - } - }, e -> { - // No need to do anything - } - )); - } + postJobUpdate(clusterService.state(), request); actionListener.onResponse(new PutJobAction.Response(updatedJob)); }, actionListener::onFailure - ); + )); + } + } + + private void postJobUpdate(ClusterState clusterState, UpdateJobAction.Request request) { + JobUpdate jobUpdate = request.getJobUpdate(); + + // Change is required if the fields that the C++ uses are being updated + boolean processUpdateRequired = jobUpdate.isAutodetectProcessUpdate(); + + if (processUpdateRequired && isJobOpen(clusterState, request.getJobId())) { + updateJobProcessNotifier.submitJobUpdate(UpdateParams.fromJobUpdate(jobUpdate), ActionListener.wrap( + isUpdated -> { + if (isUpdated) { + auditJobUpdatedIfNotInternal(request); + } + }, e -> { + // No need to do anything + } + )); } else { - postUpdateAction = ActionListener.wrap(job -> { - logger.debug("[{}] No process update required for job update: {}", () -> request.getJobId(), () -> { - try { - XContentBuilder jsonBuilder = XContentFactory.jsonBuilder(); - request.getJobUpdate().toXContent(jsonBuilder, ToXContent.EMPTY_PARAMS); - return Strings.toString(jsonBuilder); - } catch (IOException e) { - return "(unprintable due to " + e.getMessage() + ")"; - } - }); + logger.debug("[{}] No process update required for job update: {}", () -> request.getJobId(), () -> { + try { + XContentBuilder jsonBuilder = XContentFactory.jsonBuilder(); + jobUpdate.toXContent(jsonBuilder, ToXContent.EMPTY_PARAMS); + return Strings.toString(jsonBuilder); + } catch (IOException e) { + return "(unprintable due to " + e.getMessage() + ")"; + } + }); - auditJobUpdatedIfNotInternal(request); - actionListener.onResponse(new PutJobAction.Response(job)); - }, - actionListener::onFailure); + auditJobUpdatedIfNotInternal(request); } + } - + private void updateJobIndex(UpdateJobAction.Request request, ActionListener updatedJobListener) { jobConfigProvider.updateJobWithValidation(request.getJobId(), request.getJobUpdate(), maxModelMemoryLimit, - this::validate, postUpdateAction); + this::validate, updatedJobListener); + } + + private void updateJobClusterState(UpdateJobAction.Request request, ActionListener actionListener) { + Job job = MlMetadata.getMlMetadata(clusterService.state()).getJobs().get(request.getJobId()); + validate(job, request.getJobUpdate(), ActionListener.wrap( + nullValue -> clusterStateJobUpdate(request, actionListener), + actionListener::onFailure)); + } + + private void clusterStateJobUpdate(UpdateJobAction.Request request, ActionListener actionListener) { + if (request.isWaitForAck()) { + // Use the ack cluster state update + clusterService.submitStateUpdateTask("update-job-" + request.getJobId(), + new AckedClusterStateUpdateTask(request, actionListener) { + private AtomicReference updatedJob = new AtomicReference<>(); + + @Override + protected PutJobAction.Response newResponse(boolean acknowledged) { + return new PutJobAction.Response(updatedJob.get()); + } + + @Override + public ClusterState execute(ClusterState currentState) { + Job job = MlMetadata.getMlMetadata(clusterService.state()).getJobs().get(request.getJobId()); + updatedJob.set(request.getJobUpdate().mergeWithJob(job, maxModelMemoryLimit)); + return ClusterStateJobUpdate.putJobInClusterState(updatedJob.get(), true, currentState); + } + + @Override + public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { + postJobUpdate(newState, request); + } + }); + } else { + clusterService.submitStateUpdateTask("update-job-" + request.getJobId(), new ClusterStateUpdateTask() { + private AtomicReference updatedJob = new AtomicReference<>(); + + @Override + public ClusterState execute(ClusterState currentState) throws Exception { + Job job = MlMetadata.getMlMetadata(clusterService.state()).getJobs().get(request.getJobId()); + updatedJob.set(request.getJobUpdate().mergeWithJob(job, maxModelMemoryLimit)); + return ClusterStateJobUpdate.putJobInClusterState(updatedJob.get(), true, currentState); + } + + @Override + public void onFailure(String source, Exception e) { + actionListener.onFailure(e); + } + + @Override + public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { + postJobUpdate(newState, request); + actionListener.onResponse(new PutJobAction.Response(updatedJob.get())); + } + }); + } } private void validate(Job job, JobUpdate jobUpdate, ActionListener handler) { @@ -544,10 +607,25 @@ public void notifyFilterChanged(MlFilter filter, Set addedItems, Set clusterStateJobs = expandJobsFromClusterState(MetaData.ALL, clusterService.state()); + jobConfigProvider.findJobsWithCustomRules(ActionListener.wrap( - jobBuilders -> { + indexJobs -> { threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME).execute(() -> { - for (Job job: jobBuilders) { + + List allJobs = new ArrayList<>(); + // Check for duplicate jobs + for (Job indexJob : indexJobs) { + if (clusterStateJobs.containsKey(indexJob.getId())) { + logger.error("[" + indexJob.getId() + "] job configuration exists in both clusterstate and index"); + } else { + allJobs.add(indexJob); + } + } + allJobs.addAll(clusterStateJobs.values()); + + for (Job job: allJobs) { Set jobFilters = job.getAnalysisConfig().extractReferencedFilters(); ClusterState clusterState = clusterService.state(); if (jobFilters.contains(filter.getId())) { @@ -605,7 +683,16 @@ public void updateProcessOnCalendarChanged(List calendarJobIds, ActionLi return; } - // calendarJobIds may be a group or job + // Get the cluster state jobs that match + MlMetadata mlMetadata = MlMetadata.getMlMetadata(clusterState); + List existingJobsOrGroups = + calendarJobIds.stream().filter(mlMetadata::isGroupOrJob).collect(Collectors.toList()); + + Set clusterStateIds = new HashSet<>(); + existingJobsOrGroups.forEach(jobId -> clusterStateIds.addAll(mlMetadata.expandJobIds(jobId))); + + // calendarJobIds may be a group or job. + // Expand the groups to the constituent job ids jobConfigProvider.expandGroupIds(calendarJobIds, ActionListener.wrap( expandedIds -> { threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME).execute(() -> { @@ -613,6 +700,9 @@ public void updateProcessOnCalendarChanged(List calendarJobIds, ActionLi // which are job ids rather than group Ids. expandedIds.addAll(calendarJobIds); + // Merge in the cluster state job ids + expandedIds.addAll(clusterStateIds); + for (String jobId : expandedIds) { if (isJobOpen(clusterState, jobId)) { updateJobProcessNotifier.submitJobUpdate(UpdateParams.scheduledEventsUpdate(jobId), ActionListener.wrap( @@ -664,21 +754,51 @@ public void revertSnapshot(RevertModelSnapshotAction.Request request, ActionList // Step 1. update the job // ------- - Consumer updateJobHandler = response -> { - JobUpdate update = new JobUpdate.Builder(request.getJobId()) - .setModelSnapshotId(modelSnapshot.getSnapshotId()) - .setEstablishedModelMemory(response) - .build(); - - jobConfigProvider.updateJob(request.getJobId(), update, maxModelMemoryLimit, ActionListener.wrap( - job -> { - auditor.info(request.getJobId(), - Messages.getMessage(Messages.JOB_AUDIT_REVERTED, modelSnapshot.getDescription())); - updateHandler.accept(Boolean.TRUE); - }, - actionListener::onFailure - )); - }; + + Consumer updateJobHandler; + + if (ClusterStateJobUpdate.jobIsInClusterState(clusterService.state(), request.getJobId())) { + updateJobHandler = response -> clusterService.submitStateUpdateTask("revert-snapshot-" + request.getJobId(), + new AckedClusterStateUpdateTask(request, ActionListener.wrap(updateHandler, actionListener::onFailure)) { + + @Override + protected Boolean newResponse(boolean acknowledged) { + if (acknowledged) { + auditor.info(request.getJobId(), + Messages.getMessage(Messages.JOB_AUDIT_REVERTED, modelSnapshot.getDescription())); + return true; + } + actionListener.onFailure(new IllegalStateException("Could not revert modelSnapshot on job [" + + request.getJobId() + "], not acknowledged by master.")); + return false; + } + + @Override + public ClusterState execute(ClusterState currentState) { + Job job = MlMetadata.getMlMetadata(currentState).getJobs().get(request.getJobId()); + Job.Builder builder = new Job.Builder(job); + builder.setModelSnapshotId(modelSnapshot.getSnapshotId()); + builder.setEstablishedModelMemory(response); + return ClusterStateJobUpdate.putJobInClusterState(builder.build(), true, currentState); + } + }); + } else { + updateJobHandler = response -> { + JobUpdate update = new JobUpdate.Builder(request.getJobId()) + .setModelSnapshotId(modelSnapshot.getSnapshotId()) + .setEstablishedModelMemory(response) + .build(); + + jobConfigProvider.updateJob(request.getJobId(), update, maxModelMemoryLimit, ActionListener.wrap( + job -> { + auditor.info(request.getJobId(), + Messages.getMessage(Messages.JOB_AUDIT_REVERTED, modelSnapshot.getDescription())); + updateHandler.accept(Boolean.TRUE); + }, + actionListener::onFailure + )); + }; + } // Step 0. Find the appropriate established model memory for the reverted job // ------- diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportFinalizeJobExecutionActionTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportFinalizeJobExecutionActionTests.java new file mode 100644 index 0000000000000..d2d69cd0d44ce --- /dev/null +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportFinalizeJobExecutionActionTests.java @@ -0,0 +1,79 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.action; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.core.ml.MlMetadata; +import org.elasticsearch.xpack.core.ml.action.FinalizeJobExecutionAction; +import org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase; + +import java.util.Date; +import java.util.concurrent.atomic.AtomicReference; + +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +public class TransportFinalizeJobExecutionActionTests extends ESTestCase { + + public void testOperation_noJobsInClusterState() { + ClusterService clusterService = mock(ClusterService.class); + TransportFinalizeJobExecutionAction action = createAction(clusterService); + + ClusterState clusterState = ClusterState.builder(new ClusterName("finalize-job-action-tests")).build(); + + FinalizeJobExecutionAction.Request request = new FinalizeJobExecutionAction.Request(new String[]{"index-job1", "index-job2"}); + AtomicReference ack = new AtomicReference<>(); + action.masterOperation(request, clusterState, ActionListener.wrap( + ack::set, + e -> fail(e.getMessage()) + )); + + assertTrue(ack.get().isAcknowledged()); + verify(clusterService, never()).submitStateUpdateTask(any(), any()); + } + + public void testOperation_jobInClusterState() { + ClusterService clusterService = mock(ClusterService.class); + TransportFinalizeJobExecutionAction action = createAction(clusterService); + + MlMetadata.Builder mlBuilder = new MlMetadata.Builder(); + mlBuilder.putJob(BaseMlIntegTestCase.createFareQuoteJob("cs-job").build(new Date()), false); + + ClusterState clusterState = ClusterState.builder(new ClusterName("finalize-job-action-tests")) + .metaData(MetaData.builder() + .putCustom(MlMetadata.TYPE, mlBuilder.build())) + .build(); + + FinalizeJobExecutionAction.Request request = new FinalizeJobExecutionAction.Request(new String[]{"cs-job"}); + AtomicReference ack = new AtomicReference<>(); + action.masterOperation(request, clusterState, ActionListener.wrap( + ack::set, + e -> fail(e.getMessage()) + )); + + verify(clusterService, times(1)).submitStateUpdateTask(any(), any()); + } + + private TransportFinalizeJobExecutionAction createAction(ClusterService clusterService) { + return new TransportFinalizeJobExecutionAction(Settings.EMPTY, mock(TransportService.class), clusterService, + mock(ThreadPool.class), mock(ActionFilters.class), mock(IndexNameExpressionResolver.class)); + + } +} diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/JobManagerTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/JobManagerTests.java index 41907d52dd048..4aba53d2614cb 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/JobManagerTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/JobManagerTests.java @@ -31,6 +31,7 @@ import org.elasticsearch.xpack.core.ml.MachineLearningField; import org.elasticsearch.xpack.core.ml.MlMetadata; import org.elasticsearch.xpack.core.ml.action.PutJobAction; +import org.elasticsearch.xpack.core.ml.action.RevertModelSnapshotAction; import org.elasticsearch.xpack.core.ml.action.util.QueryPage; import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig; import org.elasticsearch.xpack.core.ml.job.config.DataDescription; @@ -41,6 +42,8 @@ import org.elasticsearch.xpack.core.ml.job.config.MlFilter; import org.elasticsearch.xpack.core.ml.job.config.RuleScope; import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex; +import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSizeStats; +import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot; import org.elasticsearch.xpack.ml.MachineLearning; import org.elasticsearch.xpack.ml.job.categorization.CategorizationAnalyzerTests; import org.elasticsearch.xpack.ml.job.persistence.JobConfigProvider; @@ -50,7 +53,6 @@ import org.elasticsearch.xpack.ml.notifications.Auditor; import org.junit.Before; import org.mockito.ArgumentCaptor; -import org.mockito.Matchers; import org.mockito.Mockito; import java.io.IOException; @@ -66,6 +68,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; import java.util.stream.Collectors; import static org.elasticsearch.xpack.core.ml.job.config.JobTests.buildJobBuilder; @@ -80,8 +83,10 @@ import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyBoolean; import static org.mockito.Matchers.anyString; +import static org.mockito.Matchers.eq; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -495,7 +500,7 @@ public void testPutJob_AddsCreateTime() throws IOException { AckedClusterStateUpdateTask task = (AckedClusterStateUpdateTask) invocation.getArguments()[1]; task.onAllNodesAcked(null); return null; - }).when(clusterService).submitStateUpdateTask(Matchers.eq("put-job-foo"), any(AckedClusterStateUpdateTask.class)); + }).when(clusterService).submitStateUpdateTask(eq("put-job-foo"), any(AckedClusterStateUpdateTask.class)); ArgumentCaptor requestCaptor = ArgumentCaptor.forClass(Job.class); doAnswer(invocation -> { @@ -850,6 +855,42 @@ public void testUpdateProcessOnCalendarChanged_GivenGroups() throws IOException assertThat(capturedUpdateParams.get(1).isUpdateScheduledEvents(), is(true)); } + public void testRevertSnapshot_GivenJobInClusterState() { + MlMetadata.Builder mlMetadata = new MlMetadata.Builder(); + mlMetadata.putJob(buildJobBuilder("cs-revert").build(), false); + + ClusterState clusterState = ClusterState.builder(new ClusterName("_name")) + .metaData(MetaData.builder() + .putCustom(MlMetadata.TYPE, mlMetadata.build())) + .build(); + when(clusterService.state()).thenReturn(clusterState); + + ClusterSettings clusterSettings = new ClusterSettings(environment.settings(), + Collections.singleton(MachineLearningField.MAX_MODEL_MEMORY_LIMIT)); + when(clusterService.getClusterSettings()).thenReturn(clusterSettings); + + JobConfigProvider jobConfigProvider = mock(JobConfigProvider.class); + JobManager jobManager = new JobManager(environment, environment.settings(), jobResultsProvider, clusterService, + auditor, threadPool, mock(Client.class), updateJobProcessNotifier, jobConfigProvider); + + RevertModelSnapshotAction.Request request = new RevertModelSnapshotAction.Request("cs-revert", "ms-1"); + + ModelSnapshot modelSnapshot = mock(ModelSnapshot.class); + ModelSizeStats modelSizeStats = mock(ModelSizeStats.class); + when(modelSnapshot.getModelSizeStats()).thenReturn(modelSizeStats); + + + doAnswer(invocation -> { + Consumer listener = (Consumer) invocation.getArguments()[3]; + listener.accept(100L); + return null; + }).when(jobResultsProvider).getEstablishedMemoryUsage(eq("cs-revert"), any(), any(), any(), any()); + + jobManager.revertSnapshot(request, mock(ActionListener.class), modelSnapshot); + verify(clusterService, times(1)).submitStateUpdateTask(eq("revert-snapshot-cs-revert"), any(AckedClusterStateUpdateTask.class)); + verify(jobConfigProvider, never()).updateJob(any(), any(), any(), any()); + } + private Job.Builder createJob() { Detector.Builder d1 = new Detector.Builder("info_content", "domain"); d1.setOverFieldName("client");