Skip to content

Commit

Permalink
Merge pull request #103 from upserve/fix_counter_store_test
Browse files Browse the repository at this point in the history
Fix test that caused intermittent failure
  • Loading branch information
dstuebe authored Aug 26, 2019
2 parents a28abd8 + fd16110 commit 8515f69
Showing 1 changed file with 17 additions and 32 deletions.
49 changes: 17 additions & 32 deletions src/test/java/com/upserve/uppend/CounterStoreTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@
import java.nio.file.*;
import java.util.*;
import java.util.concurrent.*;
import java.util.stream.Collectors;
import java.util.concurrent.atomic.LongAdder;
import java.util.stream.*;

import static org.junit.Assert.*;

Expand Down Expand Up @@ -217,36 +218,20 @@ public void testExample() {

@Test
public void testParallelWriteThenRead() throws Exception {
// TODO this test fails intermittently
final int numKeys = 1000;
final int totalIncrements = 1_000_000;
log.info("parallel: starting {} keys, {} total increments", numKeys, totalIncrements);
long[] vals = new long[numKeys];
ArrayList<Runnable> jobs = new ArrayList<>();
Random rand = new Random();
log.info("parallel: creating jobs");
for (int i = 0; i < totalIncrements; i++) {
int keyNum = rand.nextInt(numKeys);
vals[keyNum]++;
String key = String.format("k%010d", keyNum);
jobs.add(() -> store.increment("my_partition", key));
}
Collections.shuffle(jobs);
ArrayList<ForkJoinTask> futures = new ArrayList<>();
log.info("parallel: submitting jobs");
jobs.forEach(job -> futures.add(ForkJoinPool.commonPool().submit(job)));
log.info("parallel: waiting for jobs");
futures.forEach(ForkJoinTask::join);

log.info("parallel: flushing");
store.flush();

log.info("parallel: comparing");
for (int i = 0; i < vals.length; i++) {
long val = vals[i];
String key = String.format("k%010d", i);
assertEquals("expected value " + (i + 1) + "/" + vals.length + " to match", Long.valueOf(val), store.get("my_partition", key));
}
log.info("parallel: done");
int keys = 512;
LongAdder[] counts = IntStream.range(0, keys).mapToObj(v -> new LongAdder()).toArray(LongAdder[]::new);
new Random()
.ints(500_000, 0, keys)
.parallel()
.forEach(i -> {
store.increment(String.format("p%03d", (i % 5)), String.format("k%04d" , i));
counts[i].increment();
});
IntStream.range(0, keys).forEach(i -> {
assertEquals(
counts[i].longValue() > 0 ? counts[i].longValue() : null,
store.get(String.format("p%03d", (i % 5)), String.format("k%04d" , i))
);
});
}
}

0 comments on commit 8515f69

Please sign in to comment.