From db5d434fef30ab96aeed7a6196260c8925a189f1 Mon Sep 17 00:00:00 2001 From: David Turner Date: Tue, 29 Nov 2022 09:04:03 +0000 Subject: [PATCH] Ensure cached time elapses in ClusterServiceIT Rather than just checking `System.nanoTime()` we should verify that each thread pool's cached time has elapsed here. --- .../cluster/service/ClusterServiceIT.java | 32 ++++++++++++++++--- 1 file changed, 27 insertions(+), 5 deletions(-) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/cluster/service/ClusterServiceIT.java b/server/src/internalClusterTest/java/org/elasticsearch/cluster/service/ClusterServiceIT.java index cd2f3ebf561d5..24950fe160e86 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/cluster/service/ClusterServiceIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/cluster/service/ClusterServiceIT.java @@ -17,6 +17,7 @@ import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.ESIntegTestCase.ClusterScope; import org.elasticsearch.test.ESIntegTestCase.Scope; +import org.elasticsearch.threadpool.ThreadPool; import java.util.Arrays; import java.util.HashSet; @@ -25,6 +26,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.StreamSupport; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; @@ -414,11 +416,7 @@ public void onFailure(Exception e) { }); } - final var startNanoTime = System.nanoTime(); - while (TimeUnit.MILLISECONDS.convert(System.nanoTime() - startNanoTime, TimeUnit.NANOSECONDS) <= 0) { - // noinspection BusyWait - Thread.sleep(100); - } + waitForTimeToElapse(); pendingClusterTasks = clusterService.getMasterService().pendingTasks(); assertThat(pendingClusterTasks.size(), greaterThanOrEqualTo(5)); @@ -441,4 +439,28 @@ public void onFailure(Exception e) { block2.countDown(); } } + + private static void waitForTimeToElapse() throws InterruptedException { + final ThreadPool[] threadPools = StreamSupport.stream(internalCluster().getInstances(ClusterService.class).spliterator(), false) + .map(ClusterService::threadPool) + .toArray(ThreadPool[]::new); + final long[] startTimes = Arrays.stream(threadPools).mapToLong(ThreadPool::relativeTimeInMillis).toArray(); + + final var startNanoTime = System.nanoTime(); + while (TimeUnit.MILLISECONDS.convert(System.nanoTime() - startNanoTime, TimeUnit.NANOSECONDS) <= 100) { + // noinspection BusyWait + Thread.sleep(100); + } + + outer: do { + for (int i = 0; i < threadPools.length; i++) { + if (threadPools[i].relativeTimeInMillis() <= startTimes[i]) { + // noinspection BusyWait + Thread.sleep(100); + continue outer; + } + } + return; + } while (true); + } }