Skip to content

Commit

Permalink
Improved simulator runtime for large traces
Browse files Browse the repository at this point in the history
Unfortunately, the elegant approach of partitioning a stream of events
into batches using the Iterator view comes with a hidden performance
cost. Instead of picking off events one-by-one, large internal buffers
are created and grown (SpinedBuffer) before the first element is
provided to the consumer. The maximum chunk size is 1M elements, which
can cause a long pause while the trace is read into memory before it
is processed by the simulator. For the DS1 trace this adds 10 seconds
of startup time for the simulator and doubles the time for the trace
rewriter.

The forEach operation pulls on-demand, where our smaller I/O buffers
and concurrent prefetching keep the policies fed with a lower startup
overhead. This also greatly reduces the memory footprint to allow for
more easily analyzing large caches. However this coding structure is
slightly less pleasant.
  • Loading branch information
ben-manes committed Nov 28, 2020
1 parent 23cd333 commit 5cc2c7d
Show file tree
Hide file tree
Showing 11 changed files with 63 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@

import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;

import site.ycsb.generator.NumberGenerator;
import site.ycsb.generator.ScrambledZipfianGenerator;

Expand All @@ -45,7 +46,7 @@ public class ComputeBenchmark {
static final Function<Integer, Boolean> mappingFunction = any -> Boolean.TRUE;
static final CacheLoader<Integer, Boolean> cacheLoader = CacheLoader.from(key -> Boolean.TRUE);

@Param({"ConcurrentHashMap", "Caffeine", "Guava", "Rapidoid"})
@Param({"ConcurrentHashMap", "Caffeine", "Guava"})
String computeType;

Function<Integer, Boolean> benchmarkFunction;
Expand Down
4 changes: 2 additions & 2 deletions gradle/dependencies.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,9 @@ ext {
easymock: '4.2',
hamcrest: '2.2',
jcacheTck: '1.1.1',
jctools: '3.1.0',
jctools: '3.2.0',
junit: '4.13.1',
mockito: '3.6.0',
mockito: '3.6.28',
paxExam: '4.13.4',
testng: '7.3.0',
truth: '0.24',
Expand Down
2 changes: 1 addition & 1 deletion gradle/wrapper/gradle-wrapper.properties
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
distributionUrl=https\://services.gradle.org/distributions/gradle-6.7.1-bin.zip
distributionUrl=https\://services.gradle.org/distributions/gradle-6.8-rc-1-bin.zip
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
zipStoreBase=GRADLE_USER_HOME
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,17 @@
import static com.github.benmanes.caffeine.cache.simulator.Simulator.Message.INIT;
import static com.github.benmanes.caffeine.cache.simulator.Simulator.Message.START;
import static java.util.stream.Collectors.toList;
import static scala.collection.JavaConverters.seqAsJavaList;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.stream.Stream;

import org.apache.commons.lang3.mutable.Mutable;
import org.apache.commons.lang3.mutable.MutableObject;

import com.github.benmanes.caffeine.cache.simulator.parser.TraceFormat;
import com.github.benmanes.caffeine.cache.simulator.parser.TraceReader;
import com.github.benmanes.caffeine.cache.simulator.policy.AccessEvent;
Expand All @@ -34,7 +39,6 @@
import com.github.benmanes.caffeine.cache.simulator.policy.Registry;
import com.github.benmanes.caffeine.cache.simulator.report.Reporter;
import com.google.common.base.Stopwatch;
import com.google.common.collect.Iterators;
import com.typesafe.config.Config;

import akka.actor.AbstractActor;
Expand Down Expand Up @@ -72,8 +76,6 @@ public enum Message { INIT, START, FINISH, ERROR }
private Stopwatch stopwatch;
private Reporter reporter;
private Router router;
private int batchSize;
private int remaining;

@Override
public void preStart() {
Expand All @@ -98,32 +100,36 @@ public Receive createReceive() {
private void initialize() {
Config config = context().system().settings().config().getConfig("caffeine.simulator");
settings = new BasicSettings(config);
traceReader = makeTraceReader();

List<Routee> routes = makeRoutes();
router = new Router(new BroadcastRoutingLogic(), routes);
remaining = routes.size();

batchSize = settings.batchSize();
traceReader = makeTraceReader();
stopwatch = Stopwatch.createStarted();
router = new Router(new BroadcastRoutingLogic(), makeRoutes());
reporter = settings.report().format().create(config, traceReader.characteristics());

self().tell(START, self());
}

/** Broadcast the trace events to all of the policy actors. */
private void broadcast() {
if (remaining == 0) {
if (seqAsJavaList(router.routees()).isEmpty()) {
context().system().log().error("No active policies in the current configuration");
context().stop(self());
return;
}

long skip = settings.trace().skip();
long limit = settings.trace().limit();
int batchSize = settings.batchSize();
try (Stream<AccessEvent> events = traceReader.events().skip(skip).limit(limit)) {
Iterators.partition(events.iterator(), batchSize)
.forEachRemaining(batch -> router.route(batch, self()));
Mutable<List<AccessEvent>> batch = new MutableObject<>(new ArrayList<>(batchSize));
events.forEach(event -> {
batch.getValue().add(event);
if (batch.getValue().size() == batchSize) {
router.route(batch.getValue(), self());
batch.setValue(new ArrayList<>(batchSize));
}
});
router.route(batch.getValue(), self());
router.route(FINISH, self());
}
}
Expand All @@ -150,9 +156,8 @@ private List<Routee> makeRoutes() {
/** Add the stats to the reporter, print if completed, and stop the simulator. */
private void reportStats(PolicyStats stats) throws IOException {
reporter.add(stats);
remaining--;

if (remaining == 0) {
if (reporter.stats().size() == seqAsJavaList(router.routees()).size()) {
reporter.print();
context().stop(self());
System.out.println("Executed in " + stopwatch);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ void ensureCapacity(@NonNegative long expectedInsertions, @NonNegative double fp
return;
} else if (optimalSize == 0) {
tableShift = Integer.SIZE - 1;
table = new long[1];
table = new long[2];
} else {
int powerOfTwoShift = Integer.SIZE - Integer.numberOfLeadingZeros(optimalSize - 1);
tableShift = Integer.SIZE - powerOfTwoShift;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.stream.Stream;

Expand Down Expand Up @@ -64,18 +63,22 @@ public final class Rewriter implements Runnable {
@Override
@SuppressWarnings("PMD.ForLoopCanBeForeach")
public void run() {
int tick = 0;
Stopwatch stopwatch = Stopwatch.createStarted();
try (OutputStream output = new BufferedOutputStream(Files.newOutputStream(outputFile));
Stream<AccessEvent> events = inputFormat.readFiles(inputFiles).events();
TraceWriter writer = outputFormat.writer(output)) {
int[] tick = { 0 };
writer.writeHeader();
for (Iterator<AccessEvent> i = events.iterator(); i.hasNext();) {
writer.writeEvent(tick, i.next());
tick++;
}
events.forEach(event -> {
try {
writer.writeEvent(tick[0], event);
tick[0]++;
} catch (IOException e) {
throw new UncheckedIOException(e);
}
});
writer.writeFooter();
System.out.printf("Rewrote %,d events in %s%n", tick, stopwatch);
System.out.printf("Rewrote %,d events in %s%n", tick[0], stopwatch);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
* @author ben.manes@gmail.com (Ben Manes)
*/
public class AccessEvent {
private final Long key;
private final long key;

private AccessEvent(long key) {
this.key = key;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ private void process(List<AccessEvent> events) {
}
} catch (Exception e) {
sender().tell(ERROR, self());
context().system().stop(self());
context().system().log().error(e, "");
} finally {
policy.stats().stopwatch().stop();
Expand All @@ -75,6 +76,7 @@ private void finish() {
sender().tell(policy.stats(), self());
} catch (Exception e) {
sender().tell(ERROR, self());
context().system().stop(self());
context().system().log().error(e, "");
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package com.github.benmanes.caffeine.cache.simulator.report;

import java.io.IOException;
import java.util.Collection;

import com.github.benmanes.caffeine.cache.simulator.policy.PolicyStats;

Expand All @@ -31,4 +32,7 @@ public interface Reporter {

/** Writes the report to the output destination. */
void print() throws IOException;

/** Returns the collected statistics. */
Collection<PolicyStats> stats();
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.List;
import java.util.Set;
Expand Down Expand Up @@ -55,13 +56,11 @@ protected TextReporter(Config config, Set<Characteristic> characteristics) {
this.results = new ArrayList<>();
}

/** Adds the result of a policy simulation. */
@Override
public void add(PolicyStats policyStats) {
results.add(policyStats);
}

/** Writes the report to the output destination. */
@Override
public void print() throws IOException {
results.sort(comparator());
Expand All @@ -74,6 +73,11 @@ public void print() throws IOException {
}
}

@Override
public Collection<PolicyStats> stats() {
return results;
}

/** Returns the column headers. */
protected Set<String> headers() {
if (headers == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
package com.github.benmanes.caffeine.cache.simulator.admission.bloom;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.either;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.lessThan;

Expand All @@ -29,6 +31,7 @@

import com.github.benmanes.caffeine.cache.simulator.membership.FilterType;
import com.github.benmanes.caffeine.cache.simulator.membership.Membership;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.jakewharton.fliptables.FlipTable;
import com.typesafe.config.Config;
Expand All @@ -39,15 +42,20 @@
* @author ben.manes@gmail.com (Ben Manes)
*/
public class MembershipTest {
static final String[] HEADERS = { "Type", "Insertions", "False Positives" };
static final String[] HEADERS = { "Type", "Capacity", "Insertions", "False Positives" };
static final double EXPECTED_INSERTIONS_MULTIPLIER = 0.5;
static final double FPP = 0.03;

static final boolean display = false;

@Test(dataProvider = "filterTypes")
public void bloomFilterTest(FilterType filterType) {
List<Integer> capacities = new ArrayList<>(ImmutableList.of(0, 1));
for (int capacity = 2 << 10; capacity < (2 << 22); capacity = capacity << 2) {
capacities.add(capacity);
}

for (int capacity : capacities) {
long[] input = new Random().longs(capacity).distinct().toArray();
Config config = getConfig(filterType, capacity);
List<String[]> rows = new ArrayList<>();
Expand All @@ -56,8 +64,9 @@ public void bloomFilterTest(FilterType filterType) {
int falsePositives = falsePositives(filter, input);
int expectedInsertions = (int) (capacity * EXPECTED_INSERTIONS_MULTIPLIER);
double falsePositiveRate = ((double) falsePositives / expectedInsertions);
assertThat(filterType.toString(), falsePositiveRate, is(lessThan(FPP + 0.2)));
rows.add(row(filterType, expectedInsertions, falsePositives, falsePositiveRate));
assertThat(filterType.toString(), falsePositiveRate,
is(either(equalTo(Double.NaN)).or(lessThan(FPP + 0.2))));
rows.add(row(filterType, capacity, expectedInsertions, falsePositives, falsePositiveRate));

if (display) {
printTable(rows);
Expand Down Expand Up @@ -105,10 +114,11 @@ private Config getConfig(FilterType filterType, int capacity) {
}

/** Returns a table row for printing the false positive rates of an implementation. */
private static String[] row(FilterType filterType, int expectedInsertions,
int falsePositives, double falsePositiveRate) {
private static String[] row(FilterType filterType, int capacity,
int expectedInsertions, int falsePositives, double falsePositiveRate) {
return new String[] {
filterType.toString(),
String.format("%,d", capacity),
String.format("%,d", expectedInsertions),
String.format("%,d (%.2f %%)", falsePositives, 100 * falsePositiveRate),
};
Expand Down

0 comments on commit 5cc2c7d

Please sign in to comment.