diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java index d75f5dec96c24..954ac829a0b9c 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java @@ -1367,7 +1367,7 @@ public void testClusterRecoversAfterExceptionDuringSerialization() { assertThat(e.getCause().getMessage(), equalTo(BrokenCustom.EXCEPTION_MESSAGE)); failed.set(true); }); - cluster.runFor(DEFAULT_DELAY_VARIABILITY + 1, "processing broken task"); + cluster.runFor(2 * DEFAULT_DELAY_VARIABILITY + 1, "processing broken task"); assertTrue(failed.get()); cluster.stabilise(); diff --git a/test/framework/src/main/java/org/elasticsearch/cluster/service/FakeThreadPoolMasterService.java b/test/framework/src/main/java/org/elasticsearch/cluster/service/FakeThreadPoolMasterService.java index dfa11d8beb049..2503b1d5946f4 100644 --- a/test/framework/src/main/java/org/elasticsearch/cluster/service/FakeThreadPoolMasterService.java +++ b/test/framework/src/main/java/org/elasticsearch/cluster/service/FakeThreadPoolMasterService.java @@ -126,7 +126,7 @@ protected void publish(ClusterChangedEvent clusterChangedEvent, TaskOutputs task assert waitForPublish == false; waitForPublish = true; final AckListener ackListener = taskOutputs.createAckListener(threadPool, clusterChangedEvent.state()); - clusterStatePublisher.publish(clusterChangedEvent, new ActionListener() { + final ActionListener publishListener = new ActionListener<>() { private boolean listenerCalled = false; @@ -157,7 +157,20 @@ public void onFailure(Exception e) { scheduleNextTaskIfNecessary(); } } - }, wrapAckListener(ackListener)); + }; + threadPool.generic().execute(threadPool.getThreadContext().preserveContext(new Runnable() { + @Override + public void run() { + clusterStatePublisher.publish(clusterChangedEvent, publishListener, wrapAckListener(ackListener)); + } + + @Override + public String toString() { + return "publish change of cluster state from version [" + clusterChangedEvent.previousState().version() + "] in term [" + + clusterChangedEvent.previousState().term() + "] to version [" + clusterChangedEvent.state().version() + + "] in term [" + clusterChangedEvent.state().term() + "]"; + } + })); } protected AckListener wrapAckListener(AckListener ackListener) { diff --git a/test/framework/src/test/java/org/elasticsearch/cluster/service/FakeThreadPoolMasterServiceTests.java b/test/framework/src/test/java/org/elasticsearch/cluster/service/FakeThreadPoolMasterServiceTests.java index 3fd7b8bb981ab..e297be7be0c2a 100644 --- a/test/framework/src/test/java/org/elasticsearch/cluster/service/FakeThreadPoolMasterServiceTests.java +++ b/test/framework/src/test/java/org/elasticsearch/cluster/service/FakeThreadPoolMasterServiceTests.java @@ -36,10 +36,15 @@ import java.util.Collections; import java.util.HashSet; import java.util.List; +import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; +import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasToString; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -56,6 +61,11 @@ public void testFakeMasterService() { final ThreadContext context = new ThreadContext(Settings.EMPTY); final ThreadPool mockThreadPool = mock(ThreadPool.class); when(mockThreadPool.getThreadContext()).thenReturn(context); + + final ExecutorService executorService = mock(ExecutorService.class); + doAnswer(invocationOnMock -> runnableTasks.add((Runnable) invocationOnMock.getArguments()[0])).when(executorService).execute(any()); + when(mockThreadPool.generic()).thenReturn(executorService); + FakeThreadPoolMasterService masterService = new FakeThreadPoolMasterService("test_node","test", mockThreadPool, runnableTasks::add); masterService.setClusterStateSupplier(lastClusterStateRef::get); masterService.setClusterStatePublisher((event, publishListener, ackListener) -> { @@ -89,7 +99,14 @@ public void onFailure(String source, Exception e) { assertNull(publishingCallback.get()); assertFalse(firstTaskCompleted.get()); - runnableTasks.remove(0).run(); + final Runnable scheduleTask = runnableTasks.remove(0); + assertThat(scheduleTask, hasToString("master service scheduling next task")); + scheduleTask.run(); + + final Runnable publishTask = runnableTasks.remove(0); + assertThat(publishTask, hasToString(containsString("publish change of cluster state"))); + publishTask.run(); + assertThat(lastClusterStateRef.get().metadata().indices().size(), equalTo(1)); assertThat(lastClusterStateRef.get().version(), equalTo(firstClusterStateVersion + 1)); assertNotNull(publishingCallback.get()); @@ -121,7 +138,8 @@ public void onFailure(String source, Exception e) { assertTrue(firstTaskCompleted.get()); assertThat(runnableTasks.size(), equalTo(1)); // check that new task gets queued - runnableTasks.remove(0).run(); + runnableTasks.remove(0).run(); // schedule again + runnableTasks.remove(0).run(); // publish again assertThat(lastClusterStateRef.get().metadata().indices().size(), equalTo(2)); assertThat(lastClusterStateRef.get().version(), equalTo(firstClusterStateVersion + 2)); assertNotNull(publishingCallback.get());