Skip to content

Commit

Permalink
Use parallel flux to ensure all cores are used (#6)
Browse files Browse the repository at this point in the history
  • Loading branch information
mikeharder authored Nov 21, 2019
1 parent d00e451 commit a546101
Showing 1 changed file with 8 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

public class PerfStressProgram {
private static int[] _completedOperations;
Expand Down Expand Up @@ -165,7 +166,12 @@ public static void RunTests(PerfStressTest<?>[] tests, boolean sync, int paralle
}
}
else {
Flux.range(0, parallel).flatMap(i -> RunLoopAsync(tests[i], i, endNanoTime)).blockLast();
Flux.range(0, parallel)
.parallel()
.runOn(Schedulers.boundedElastic())
.flatMap(i -> RunLoopAsync(tests[i], i, endNanoTime))
.then()
.block();
}

progressStatus.dispose();
Expand All @@ -174,7 +180,7 @@ public static void RunTests(PerfStressTest<?>[] tests, boolean sync, int paralle

int totalOperations = IntStream.of(_completedOperations).sum();
double operationsPerSecond = IntStream.range(0, parallel)
.mapToDouble(i -> ((double)_completedOperations[i]) / (_lastCompletionNanoTimes[i] / 1000000000))
.mapToDouble(i -> _completedOperations[i] / (((double)_lastCompletionNanoTimes[i]) / 1000000000))
.sum();
double secondsPerOperation = 1 / operationsPerSecond;
double weightedAverageSeconds = totalOperations / operationsPerSecond;
Expand Down

0 comments on commit a546101

Please sign in to comment.