diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportCloseJobAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportCloseJobAction.java index b7d8e096f3d23..713b0c43a8c9e 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportCloseJobAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportCloseJobAction.java @@ -28,6 +28,7 @@ import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.core.ml.MlTasks; import org.elasticsearch.xpack.core.ml.action.CloseJobAction; +import org.elasticsearch.xpack.core.ml.action.FinalizeJobExecutionAction; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedState; import org.elasticsearch.xpack.core.ml.job.config.JobState; import org.elasticsearch.xpack.core.ml.job.config.JobTaskState; @@ -47,6 +48,9 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; +import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN; +import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin; + public class TransportCloseJobAction extends TransportTasksAction { @@ -422,7 +426,10 @@ void waitForJobClosed(CloseJobAction.Request request, WaitForCloseRequest waitFo }, request.getCloseTimeout(), new ActionListener() { @Override public void onResponse(Boolean result) { - listener.onResponse(response); + FinalizeJobExecutionAction.Request finalizeRequest = new FinalizeJobExecutionAction.Request( + waitForCloseRequest.jobsToFinalize.toArray(new String[0])); + executeAsyncWithOrigin(client, ML_ORIGIN, FinalizeJobExecutionAction.INSTANCE, finalizeRequest, + ActionListener.wrap(r -> listener.onResponse(response), listener::onFailure)); } @Override diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportFinalizeJobExecutionAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportFinalizeJobExecutionAction.java index f8e0f2e862957..d6c03d6c93fbf 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportFinalizeJobExecutionAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportFinalizeJobExecutionAction.java @@ -7,8 +7,12 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.action.support.master.TransportMasterNodeAction; +import org.elasticsearch.action.update.UpdateAction; +import org.elasticsearch.action.update.UpdateRequest; +import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockLevel; @@ -18,15 +22,31 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.core.ml.action.FinalizeJobExecutionAction; +import org.elasticsearch.xpack.core.ml.job.config.Job; +import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex; +import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings; +import org.elasticsearch.xpack.ml.MachineLearning; +import org.elasticsearch.xpack.ml.utils.ChainTaskExecutor; + +import java.util.Collections; +import java.util.Date; +import java.util.Map; + +import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN; +import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin; public class TransportFinalizeJobExecutionAction extends TransportMasterNodeAction { + private final Client client; + @Inject public TransportFinalizeJobExecutionAction(TransportService transportService, ClusterService clusterService, ThreadPool threadPool, - ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) { + ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, + Client client) { super(FinalizeJobExecutionAction.NAME, transportService, clusterService, threadPool, actionFilters, indexNameExpressionResolver, FinalizeJobExecutionAction.Request::new); + this.client = client; } @Override @@ -42,12 +62,36 @@ protected AcknowledgedResponse newResponse() { @Override protected void masterOperation(FinalizeJobExecutionAction.Request request, ClusterState state, ActionListener listener) { - // This action is no longer required but needs to be preserved - // in case it is called by an old node in a mixed cluster - listener.onResponse(new AcknowledgedResponse(true)); + String jobIdString = String.join(",", request.getJobIds()); + logger.debug("finalizing jobs [{}]", jobIdString); + + ChainTaskExecutor chainTaskExecutor = new ChainTaskExecutor(threadPool.executor( + MachineLearning.UTILITY_THREAD_POOL_NAME), true); + + Map update = Collections.singletonMap(Job.FINISHED_TIME.getPreferredName(), new Date()); + + for (String jobId: request.getJobIds()) { + UpdateRequest updateRequest = new UpdateRequest(AnomalyDetectorsIndex.configIndexName(), + ElasticsearchMappings.DOC_TYPE, Job.documentId(jobId)); + updateRequest.retryOnConflict(3); + updateRequest.doc(update); + updateRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); + chainTaskExecutor.add(chainedListener -> { + executeAsyncWithOrigin(client, ML_ORIGIN, UpdateAction.INSTANCE, updateRequest, ActionListener.wrap( + updateResponse -> chainedListener.onResponse(null), + chainedListener::onFailure + )); + }); + } - // TODO restore functionality for index jobs + chainTaskExecutor.execute(ActionListener.wrap( + aVoid -> { + logger.debug("finalized job [{}]", jobIdString); + listener.onResponse(new AcknowledgedResponse(true)); + }, + listener::onFailure + )); } @Override diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessor.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessor.java index f536b79547736..56bf487efa26d 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessor.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessor.java @@ -11,20 +11,16 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.DocWriteResponse; import org.elasticsearch.action.index.IndexResponse; -import org.elasticsearch.action.support.ThreadedActionListener; import org.elasticsearch.action.support.WriteRequest; -import org.elasticsearch.action.update.UpdateAction; -import org.elasticsearch.action.update.UpdateRequest; -import org.elasticsearch.action.update.UpdateResponse; import org.elasticsearch.client.Client; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.xpack.core.ml.MachineLearningField; -import org.elasticsearch.xpack.core.ml.job.config.Job; +import org.elasticsearch.xpack.core.ml.action.PutJobAction; +import org.elasticsearch.xpack.core.ml.action.UpdateJobAction; +import org.elasticsearch.xpack.core.ml.job.config.JobUpdate; import org.elasticsearch.xpack.core.ml.job.messages.Messages; -import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex; -import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings; import org.elasticsearch.xpack.core.ml.job.process.autodetect.output.FlushAcknowledgement; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSizeStats; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot; @@ -36,7 +32,6 @@ import org.elasticsearch.xpack.core.ml.job.results.ForecastRequestStats; import org.elasticsearch.xpack.core.ml.job.results.Influencer; import org.elasticsearch.xpack.core.ml.job.results.ModelPlot; -import org.elasticsearch.xpack.ml.MachineLearning; import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister; import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcess; import org.elasticsearch.xpack.ml.job.process.normalizer.Renormalizer; @@ -44,12 +39,8 @@ import org.elasticsearch.xpack.ml.notifications.Auditor; import java.time.Duration; -import java.util.Collections; -import java.util.Date; -import java.util.HashMap; import java.util.Iterator; import java.util.List; -import java.util.Map; import java.util.Objects; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Semaphore; @@ -88,7 +79,6 @@ public class AutoDetectResultProcessor { final CountDownLatch completionLatch = new CountDownLatch(1); final Semaphore updateModelSnapshotSemaphore = new Semaphore(1); - volatile CountDownLatch onCloseActionsLatch; private final FlushListener flushListener; private volatile boolean processKilled; private volatile boolean failed; @@ -149,18 +139,8 @@ public void process(AutodetectProcess process) { } catch (Exception e) { LOGGER.warn(new ParameterizedMessage("[{}] Error persisting autodetect results", jobId), e); } - if (processKilled == false) { - try { - onAutodetectClose(); - } catch (Exception e) { - if (onCloseActionsLatch != null) { - onCloseActionsLatch.countDown(); - } - throw e; - } - } - LOGGER.info("[{}] {} buckets parsed from autodetect output", jobId, bucketCount); + } catch (Exception e) { failed = true; @@ -313,6 +293,9 @@ private void notifyModelMemoryStatusChange(Context context, ModelSizeStats model } protected void updateModelSnapshotOnJob(ModelSnapshot modelSnapshot) { + JobUpdate update = new JobUpdate.Builder(jobId).setModelSnapshotId(modelSnapshot.getSnapshotId()).build(); + UpdateJobAction.Request updateRequest = UpdateJobAction.Request.internal(jobId, update); + try { // This blocks the main processing thread in the unlikely event // there are 2 model snapshots queued up. But it also has the @@ -324,52 +307,20 @@ protected void updateModelSnapshotOnJob(ModelSnapshot modelSnapshot) { return; } - Map update = new HashMap<>(); - update.put(Job.MODEL_SNAPSHOT_ID.getPreferredName(), modelSnapshot.getSnapshotId()); - update.put(Job.MODEL_SNAPSHOT_MIN_VERSION.getPreferredName(), modelSnapshot.getMinVersion().toString()); - - updateJob(jobId, update, new ActionListener() { - @Override - public void onResponse(UpdateResponse updateResponse) { - updateModelSnapshotSemaphore.release(); - LOGGER.debug("[{}] Updated job with model snapshot id [{}]", jobId, modelSnapshot.getSnapshotId()); - } - - @Override - public void onFailure(Exception e) { - updateModelSnapshotSemaphore.release(); - LOGGER.error("[" + jobId + "] Failed to update job with new model snapshot id [" + - modelSnapshot.getSnapshotId() + "]", e); - } - }); - } - - private void onAutodetectClose() { - onCloseActionsLatch = new CountDownLatch(1); - - ActionListener updateListener = ActionListener.wrap( - updateResponse -> { - onCloseActionsLatch.countDown(); - }, - e -> { - LOGGER.error("[" + jobId + "] Failed to finalize job on autodetect close", e); - onCloseActionsLatch.countDown(); - } - ); - - updateJob(jobId, Collections.singletonMap(Job.FINISHED_TIME.getPreferredName(), new Date()), - new ThreadedActionListener<>(LOGGER, client.threadPool(), - MachineLearning.UTILITY_THREAD_POOL_NAME, updateListener, false) - ); - } + executeAsyncWithOrigin(client, ML_ORIGIN, UpdateJobAction.INSTANCE, updateRequest, new ActionListener() { + @Override + public void onResponse(PutJobAction.Response response) { + updateModelSnapshotSemaphore.release(); + LOGGER.debug("[{}] Updated job with model snapshot id [{}]", jobId, modelSnapshot.getSnapshotId()); + } - private void updateJob(String jobId, Map update, ActionListener listener) { - UpdateRequest updateRequest = new UpdateRequest(AnomalyDetectorsIndex.configIndexName(), - ElasticsearchMappings.DOC_TYPE, Job.documentId(jobId)); - updateRequest.retryOnConflict(3); - updateRequest.doc(update); - updateRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); - executeAsyncWithOrigin(client, ML_ORIGIN, UpdateAction.INSTANCE, updateRequest, listener); + @Override + public void onFailure(Exception e) { + updateModelSnapshotSemaphore.release(); + LOGGER.error("[" + jobId + "] Failed to update job with new model snapshot id [" + + modelSnapshot.getSnapshotId() + "]", e); + } + }); } public void awaitCompletion() throws TimeoutException { @@ -381,13 +332,6 @@ public void awaitCompletion() throws TimeoutException { throw new TimeoutException("Timed out waiting for results processor to complete for job " + jobId); } - // Once completionLatch has passed then onCloseActionsLatch must either - // be set or null, it will not be set later. - if (onCloseActionsLatch != null && onCloseActionsLatch.await( - MachineLearningField.STATE_PERSIST_RESTORE_TIMEOUT.getMinutes(), TimeUnit.MINUTES) == false) { - throw new TimeoutException("Timed out waiting for results processor run post close actions " + jobId); - } - // Input stream has been completely processed at this point. // Wait for any updateModelSnapshotOnJob calls to complete. updateModelSnapshotSemaphore.acquire(); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportFinalizeJobExecutionActionTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportFinalizeJobExecutionActionTests.java new file mode 100644 index 0000000000000..fc44c520ebfc7 --- /dev/null +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportFinalizeJobExecutionActionTests.java @@ -0,0 +1,89 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.ml.action; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.action.update.UpdateAction; +import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.ThreadContext; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.core.ml.action.FinalizeJobExecutionAction; +import org.elasticsearch.xpack.ml.MachineLearning; +import org.junit.Before; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.atomic.AtomicReference; + +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class TransportFinalizeJobExecutionActionTests extends ESTestCase { + + private ThreadPool threadPool; + private Client client; + + @Before + @SuppressWarnings("unchecked") + private void setupMocks() { + ExecutorService executorService = mock(ExecutorService.class); + threadPool = mock(ThreadPool.class); + org.elasticsearch.mock.orig.Mockito.doAnswer(invocation -> { + ((Runnable) invocation.getArguments()[0]).run(); + return null; + }).when(executorService).execute(any(Runnable.class)); + when(threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME)).thenReturn(executorService); + + client = mock(Client.class); + doAnswer( invocationOnMock -> { + ActionListener listener = (ActionListener) invocationOnMock.getArguments()[2]; + listener.onResponse(null); + return null; + }).when(client).execute(eq(UpdateAction.INSTANCE), any(), any()); + + when(client.threadPool()).thenReturn(threadPool); + when(threadPool.getThreadContext()).thenReturn(new ThreadContext(Settings.EMPTY)); + } + + public void testOperation() { + ClusterService clusterService = mock(ClusterService.class); + TransportFinalizeJobExecutionAction action = createAction(clusterService); + + ClusterState clusterState = ClusterState.builder(new ClusterName("finalize-job-action-tests")).build(); + + FinalizeJobExecutionAction.Request request = new FinalizeJobExecutionAction.Request(new String[]{"job1", "job2"}); + AtomicReference ack = new AtomicReference<>(); + action.masterOperation(request, clusterState, ActionListener.wrap( + ack::set, + e -> assertNull(e.getMessage()) + )); + + assertTrue(ack.get().isAcknowledged()); + verify(client, times(2)).execute(eq(UpdateAction.INSTANCE), any(), any()); + verify(clusterService, never()).submitStateUpdateTask(any(), any()); + } + + private TransportFinalizeJobExecutionAction createAction(ClusterService clusterService) { + return new TransportFinalizeJobExecutionAction(mock(TransportService.class), clusterService, + threadPool, mock(ActionFilters.class), mock(IndexNameExpressionResolver.class), client); + + } +} diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessorTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessorTests.java index 761cb56fa8804..807ac81830904 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessorTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessorTests.java @@ -8,12 +8,8 @@ import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchParseException; import org.elasticsearch.Version; -import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.support.WriteRequest; -import org.elasticsearch.action.update.UpdateAction; -import org.elasticsearch.action.update.UpdateRequest; -import org.elasticsearch.action.update.UpdateResponse; import org.elasticsearch.client.Client; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; @@ -23,7 +19,8 @@ import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.xpack.core.ml.job.config.Job; +import org.elasticsearch.xpack.core.ml.action.UpdateJobAction; +import org.elasticsearch.xpack.core.ml.job.config.JobUpdate; import org.elasticsearch.xpack.core.ml.job.messages.Messages; import org.elasticsearch.xpack.core.ml.job.process.autodetect.output.FlushAcknowledgement; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSizeStats; @@ -34,7 +31,6 @@ import org.elasticsearch.xpack.core.ml.job.results.CategoryDefinition; import org.elasticsearch.xpack.core.ml.job.results.Influencer; import org.elasticsearch.xpack.core.ml.job.results.ModelPlot; -import org.elasticsearch.xpack.ml.MachineLearning; import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister; import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcess; import org.elasticsearch.xpack.ml.job.process.normalizer.Renormalizer; @@ -42,7 +38,6 @@ import org.elasticsearch.xpack.ml.notifications.Auditor; import org.junit.After; import org.junit.Before; -import org.mockito.ArgumentCaptor; import org.mockito.InOrder; import java.time.Duration; @@ -51,7 +46,6 @@ import java.util.Date; import java.util.Iterator; import java.util.List; -import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -62,7 +56,6 @@ import static org.mockito.Matchers.anyString; import static org.mockito.Matchers.eq; import static org.mockito.Matchers.same; -import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.mock; @@ -90,21 +83,9 @@ public class AutoDetectResultProcessorTests extends ESTestCase { public void setUpMocks() { executor = new ScheduledThreadPoolExecutor(1); client = mock(Client.class); - doAnswer(invocation -> { - @SuppressWarnings("unchecked") - ActionListener listener = (ActionListener) invocation.getArguments()[2]; - listener.onResponse(new UpdateResponse()); - return null; - }).when(client).execute(same(UpdateAction.INSTANCE), any(), any()); threadPool = mock(ThreadPool.class); when(client.threadPool()).thenReturn(threadPool); when(threadPool.getThreadContext()).thenReturn(new ThreadContext(Settings.EMPTY)); - ExecutorService executorService = mock(ExecutorService.class); - org.elasticsearch.mock.orig.Mockito.doAnswer(invocation -> { - ((Runnable) invocation.getArguments()[0]).run(); - return null; - }).when(executorService).execute(any(Runnable.class)); - when(threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME)).thenReturn(executorService); auditor = mock(Auditor.class); renormalizer = mock(Renormalizer.class); persister = mock(JobResultsPersister.class); @@ -133,9 +114,7 @@ public void testProcess() throws TimeoutException { processorUnderTest.process(process); processorUnderTest.awaitCompletion(); verify(renormalizer, times(1)).waitUntilIdle(); - verify(client, times(1)).execute(same(UpdateAction.INSTANCE), any(), any()); assertEquals(0, processorUnderTest.completionLatch.getCount()); - assertEquals(0, processorUnderTest.onCloseActionsLatch.getCount()); } public void testProcessResult_bucket() { @@ -352,12 +331,11 @@ public void testProcessResult_modelSnapshot() { verify(persister, times(1)).persistModelSnapshot(modelSnapshot, WriteRequest.RefreshPolicy.IMMEDIATE); - ArgumentCaptor requestCaptor = ArgumentCaptor.forClass(UpdateRequest.class); - verify(client).execute(same(UpdateAction.INSTANCE), requestCaptor.capture(), any()); - verifyNoMoreInteractions(persister); + UpdateJobAction.Request expectedJobUpdateRequest = UpdateJobAction.Request.internal(JOB_ID, + new JobUpdate.Builder(JOB_ID).setModelSnapshotId("a_snapshot_id").build()); - UpdateRequest capturedRequest = requestCaptor.getValue(); - assertNotNull(capturedRequest.doc().sourceAsMap().get(Job.MODEL_SNAPSHOT_ID.getPreferredName())); + verify(client).execute(same(UpdateJobAction.INSTANCE), eq(expectedJobUpdateRequest), any()); + verifyNoMoreInteractions(persister); } public void testProcessResult_quantiles_givenRenormalizationIsEnabled() { @@ -413,7 +391,6 @@ public void testAwaitCompletion() throws TimeoutException { processorUnderTest.awaitCompletion(); assertEquals(0, processorUnderTest.completionLatch.getCount()); assertEquals(1, processorUnderTest.updateModelSnapshotSemaphore.availablePermits()); - assertEquals(0, processorUnderTest.onCloseActionsLatch.getCount()); } public void testPersisterThrowingDoesntBlockProcessing() { @@ -467,7 +444,6 @@ public void testKill() throws TimeoutException { processorUnderTest.process(process); processorUnderTest.awaitCompletion(); - assertNull(processorUnderTest.onCloseActionsLatch); assertEquals(0, processorUnderTest.completionLatch.getCount()); assertEquals(1, processorUnderTest.updateModelSnapshotSemaphore.availablePermits()); @@ -477,7 +453,6 @@ public void testKill() throws TimeoutException { verify(renormalizer).shutdown(); verify(renormalizer, times(1)).waitUntilIdle(); verify(flushListener, times(1)).clear(); - verify(client, never()).execute(same(UpdateAction.INSTANCE), any(), any()); } private void setupScheduleDelayTime(TimeValue delay) {