diff --git a/src/test/java/com/upserve/uppend/CounterStoreTest.java b/src/test/java/com/upserve/uppend/CounterStoreTest.java index dd90ffe7..e0dd9789 100644 --- a/src/test/java/com/upserve/uppend/CounterStoreTest.java +++ b/src/test/java/com/upserve/uppend/CounterStoreTest.java @@ -11,7 +11,8 @@ import java.nio.file.*; import java.util.*; import java.util.concurrent.*; -import java.util.stream.Collectors; +import java.util.concurrent.atomic.LongAdder; +import java.util.stream.*; import static org.junit.Assert.*; @@ -217,36 +218,20 @@ public void testExample() { @Test public void testParallelWriteThenRead() throws Exception { - // TODO this test fails intermittently - final int numKeys = 1000; - final int totalIncrements = 1_000_000; - log.info("parallel: starting {} keys, {} total increments", numKeys, totalIncrements); - long[] vals = new long[numKeys]; - ArrayList jobs = new ArrayList<>(); - Random rand = new Random(); - log.info("parallel: creating jobs"); - for (int i = 0; i < totalIncrements; i++) { - int keyNum = rand.nextInt(numKeys); - vals[keyNum]++; - String key = String.format("k%010d", keyNum); - jobs.add(() -> store.increment("my_partition", key)); - } - Collections.shuffle(jobs); - ArrayList futures = new ArrayList<>(); - log.info("parallel: submitting jobs"); - jobs.forEach(job -> futures.add(ForkJoinPool.commonPool().submit(job))); - log.info("parallel: waiting for jobs"); - futures.forEach(ForkJoinTask::join); - - log.info("parallel: flushing"); - store.flush(); - - log.info("parallel: comparing"); - for (int i = 0; i < vals.length; i++) { - long val = vals[i]; - String key = String.format("k%010d", i); - assertEquals("expected value " + (i + 1) + "/" + vals.length + " to match", Long.valueOf(val), store.get("my_partition", key)); - } - log.info("parallel: done"); + int keys = 512; + LongAdder[] counts = IntStream.range(0, keys).mapToObj(v -> new LongAdder()).toArray(LongAdder[]::new); + new Random() + .ints(500_000, 0, keys) + .parallel() + .forEach(i -> { + store.increment(String.format("p%03d", (i % 5)), String.format("k%04d" , i)); + counts[i].increment(); + }); + IntStream.range(0, keys).forEach(i -> { + assertEquals( + counts[i].longValue() > 0 ? counts[i].longValue() : null, + store.get(String.format("p%03d", (i % 5)), String.format("k%04d" , i)) + ); + }); } }