diff --git a/metrics-benchmarks/src/main/java/com/codahale/metrics/benchmarks/ReservoirBenchmark.java b/metrics-benchmarks/src/main/java/com/codahale/metrics/benchmarks/ReservoirBenchmark.java index 5d2e7885ac..b7c6801d2f 100644 --- a/metrics-benchmarks/src/main/java/com/codahale/metrics/benchmarks/ReservoirBenchmark.java +++ b/metrics-benchmarks/src/main/java/com/codahale/metrics/benchmarks/ReservoirBenchmark.java @@ -1,6 +1,8 @@ package com.codahale.metrics.benchmarks; import com.codahale.metrics.ExponentiallyDecayingReservoir; +import com.codahale.metrics.LockFreeExponentiallyDecayingReservoir; +import com.codahale.metrics.Reservoir; import com.codahale.metrics.SlidingTimeWindowArrayReservoir; import com.codahale.metrics.SlidingTimeWindowReservoir; import com.codahale.metrics.SlidingWindowReservoir; @@ -23,6 +25,7 @@ public class ReservoirBenchmark { private final UniformReservoir uniform = new UniformReservoir(); private final ExponentiallyDecayingReservoir exponential = new ExponentiallyDecayingReservoir(); + private final Reservoir lockFreeExponential = LockFreeExponentiallyDecayingReservoir.builder().build(); private final SlidingWindowReservoir sliding = new SlidingWindowReservoir(1000); private final SlidingTimeWindowReservoir slidingTime = new SlidingTimeWindowReservoir(200, TimeUnit.MILLISECONDS); private final SlidingTimeWindowArrayReservoir arrTime = new SlidingTimeWindowArrayReservoir(200, TimeUnit.MILLISECONDS); @@ -60,6 +63,12 @@ public Object perfSlidingTimeWindowReservoir() { return slidingTime; } + @Benchmark + public Object perfLockFreeExponentiallyDecayingReservoir() { + lockFreeExponential.update(nextValue); + return lockFreeExponential; + } + public static void main(String[] args) throws RunnerException { Options opt = new OptionsBuilder() .include(".*" + ReservoirBenchmark.class.getSimpleName() + ".*") diff --git a/metrics-core/src/main/java/com/codahale/metrics/LockFreeExponentiallyDecayingReservoir.java b/metrics-core/src/main/java/com/codahale/metrics/LockFreeExponentiallyDecayingReservoir.java new file mode 100644 index 0000000000..cf258f046c --- /dev/null +++ b/metrics-core/src/main/java/com/codahale/metrics/LockFreeExponentiallyDecayingReservoir.java @@ -0,0 +1,270 @@ +package com.codahale.metrics; + +import com.codahale.metrics.WeightedSnapshot.WeightedSample; + +import java.time.Duration; +import java.util.Objects; +import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; +import java.util.function.BiConsumer; + +/** + * A lock-free exponentially-decaying random reservoir of {@code long}s. Uses Cormode et al's + * forward-decaying priority reservoir sampling method to produce a statistically representative + * sampling reservoir, exponentially biased towards newer entries. + * + * @see + * Cormode et al. Forward Decay: A Practical Time Decay Model for Streaming Systems. ICDE '09: + * Proceedings of the 2009 IEEE International Conference on Data Engineering (2009) + * + * {@link LockFreeExponentiallyDecayingReservoir} is based closely on the {@link ExponentiallyDecayingReservoir}, + * however it provides looser guarantees while completely avoiding locks. + * + * Looser guarantees: + * + * + * @author Carter Kozak + */ +public final class LockFreeExponentiallyDecayingReservoir implements Reservoir { + + private static final double SECONDS_PER_NANO = .000_000_001D; + private static final AtomicReferenceFieldUpdater stateUpdater = + AtomicReferenceFieldUpdater.newUpdater(LockFreeExponentiallyDecayingReservoir.class, State.class, "state"); + + private final int size; + private final long rescaleThresholdNanos; + private final Clock clock; + + private volatile State state; + + private static final class State { + + private static final AtomicIntegerFieldUpdater countUpdater = + AtomicIntegerFieldUpdater.newUpdater(State.class, "count"); + + private final double alphaNanos; + private final int size; + private final long startTick; + // Count is updated after samples are successfully added to the map. + private final ConcurrentSkipListMap values; + + private volatile int count; + + State( + double alphaNanos, + int size, + long startTick, + int count, + ConcurrentSkipListMap values) { + this.alphaNanos = alphaNanos; + this.size = size; + this.startTick = startTick; + this.values = values; + this.count = count; + } + + private void update(long value, long timestampNanos) { + double itemWeight = weight(timestampNanos - startTick); + double priority = itemWeight / ThreadLocalRandom.current().nextDouble(); + boolean mapIsFull = count >= size; + if (!mapIsFull || values.firstKey() < priority) { + addSample(priority, value, itemWeight, mapIsFull); + } + } + + private void addSample(double priority, long value, double itemWeight, boolean bypassIncrement) { + if (values.putIfAbsent(priority, new WeightedSample(value, itemWeight)) == null + && (bypassIncrement || countUpdater.incrementAndGet(this) > size)) { + values.pollFirstEntry(); + } + } + + /* "A common feature of the above techniques—indeed, the key technique that + * allows us to track the decayed weights efficiently—is that they maintain + * counts and other quantities based on g(ti − L), and only scale by g(t − L) + * at query time. But while g(ti −L)/g(t−L) is guaranteed to lie between zero + * and one, the intermediate values of g(ti − L) could become very large. For + * polynomial functions, these values should not grow too large, and should be + * effectively represented in practice by floating point values without loss of + * precision. For exponential functions, these values could grow quite large as + * new values of (ti − L) become large, and potentially exceed the capacity of + * common floating point types. However, since the values stored by the + * algorithms are linear combinations of g values (scaled sums), they can be + * rescaled relative to a new landmark. That is, by the analysis of exponential + * decay in Section III-A, the choice of L does not affect the final result. We + * can therefore multiply each value based on L by a factor of exp(−α(L′ − L)), + * and obtain the correct value as if we had instead computed relative to a new + * landmark L′ (and then use this new L′ at query time). This can be done with + * a linear pass over whatever data structure is being used." + */ + State rescale(long newTick) { + long durationNanos = newTick - startTick; + double scalingFactor = Math.exp(-alphaNanos * durationNanos); + int newCount = 0; + ConcurrentSkipListMap newValues = new ConcurrentSkipListMap<>(); + if (Double.compare(scalingFactor, 0) != 0) { + RescalingConsumer consumer = new RescalingConsumer(scalingFactor, newValues); + values.forEach(consumer); + // make sure the counter is in sync with the number of stored samples. + newCount = consumer.count; + } + // It's possible that more values were added while the map was scanned, those with the + // minimum priorities are removed. + while (newCount > size) { + Objects.requireNonNull(newValues.pollFirstEntry(), "Expected an entry"); + newCount--; + } + return new State(alphaNanos, size, newTick, newCount, newValues); + } + + private double weight(long durationNanos) { + return Math.exp(alphaNanos * durationNanos); + } + } + + private static final class RescalingConsumer implements BiConsumer { + private final double scalingFactor; + private final ConcurrentSkipListMap values; + private int count; + + RescalingConsumer(double scalingFactor, ConcurrentSkipListMap values) { + this.scalingFactor = scalingFactor; + this.values = values; + } + + @Override + public void accept(Double priority, WeightedSample sample) { + double newWeight = sample.weight * scalingFactor; + if (Double.compare(newWeight, 0) == 0) { + return; + } + WeightedSample newSample = new WeightedSample(sample.value, newWeight); + if (values.put(priority * scalingFactor, newSample) == null) { + count++; + } + } + } + + private LockFreeExponentiallyDecayingReservoir(int size, double alpha, Duration rescaleThreshold, Clock clock) { + // Scale alpha to nanoseconds + double alphaNanos = alpha * SECONDS_PER_NANO; + this.size = size; + this.clock = clock; + this.rescaleThresholdNanos = rescaleThreshold.toNanos(); + this.state = new State(alphaNanos, size, clock.getTick(), 0, new ConcurrentSkipListMap<>()); + } + + @Override + public int size() { + return Math.min(size, state.count); + } + + @Override + public void update(long value) { + long now = clock.getTick(); + rescaleIfNeeded(now).update(value, now); + } + + private State rescaleIfNeeded(long currentTick) { + // This method is optimized for size so the check may be quickly inlined. + // Rescaling occurs substantially less frequently than the check itself. + State stateSnapshot = this.state; + if (currentTick - stateSnapshot.startTick >= rescaleThresholdNanos) { + return doRescale(currentTick, stateSnapshot); + } + return stateSnapshot; + } + + private State doRescale(long currentTick, State stateSnapshot) { + State newState = stateSnapshot.rescale(currentTick); + if (stateUpdater.compareAndSet(this, stateSnapshot, newState)) { + // newState successfully installed + return newState; + } + // Otherwise another thread has won the race and we can return the result of a volatile read. + // It's possible this has taken so long that another update is required, however that's unlikely + // and no worse than the standard race between a rescale and update. + return this.state; + } + + @Override + public Snapshot getSnapshot() { + State stateSnapshot = rescaleIfNeeded(clock.getTick()); + return new WeightedSnapshot(stateSnapshot.values.values()); + } + + public static Builder builder() { + return new Builder(); + } + + /** + * By default this uses a size of 1028 elements, which offers a 99.9% + * confidence level with a 5% margin of error assuming a normal distribution, and an alpha + * factor of 0.015, which heavily biases the reservoir to the past 5 minutes of measurements. + */ + public static final class Builder { + private static final int DEFAULT_SIZE = 1028; + private static final double DEFAULT_ALPHA = 0.015D; + private static final Duration DEFAULT_RESCALE_THRESHOLD = Duration.ofHours(1); + + private int size = DEFAULT_SIZE; + private double alpha = DEFAULT_ALPHA; + private Duration rescaleThreshold = DEFAULT_RESCALE_THRESHOLD; + private Clock clock = Clock.defaultClock(); + + private Builder() {} + + /** + * Maximum number of samples to keep in the reservoir. Once this number is reached older samples are + * replaced (based on weight, with some amount of random jitter). + */ + public Builder size(int value) { + if (value <= 0) { + throw new IllegalArgumentException( + "LockFreeExponentiallyDecayingReservoir size must be positive: " + value); + } + this.size = value; + return this; + } + + /** + * Alpha is the exponential decay factor. Higher values bias results more heavily toward newer values. + */ + public Builder alpha(double value) { + this.alpha = value; + return this; + } + + /** + * Interval at which this reservoir is rescaled. + */ + public Builder rescaleThreshold(Duration value) { + this.rescaleThreshold = Objects.requireNonNull(value, "rescaleThreshold is required"); + return this; + } + + /** + * Clock instance used for decay. + */ + public Builder clock(Clock value) { + this.clock = Objects.requireNonNull(value, "clock is required"); + return this; + } + + public Reservoir build() { + return new LockFreeExponentiallyDecayingReservoir(size, alpha, rescaleThreshold, clock); + } + } +} diff --git a/metrics-core/src/test/java/com/codahale/metrics/ExponentiallyDecayingReservoirTest.java b/metrics-core/src/test/java/com/codahale/metrics/ExponentiallyDecayingReservoirTest.java index feccc847b8..870863798d 100644 --- a/metrics-core/src/test/java/com/codahale/metrics/ExponentiallyDecayingReservoirTest.java +++ b/metrics-core/src/test/java/com/codahale/metrics/ExponentiallyDecayingReservoirTest.java @@ -2,17 +2,63 @@ import com.codahale.metrics.Timer.Context; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import java.util.Arrays; +import java.util.Collection; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; import static org.assertj.core.api.Assertions.assertThat; +@RunWith(Parameterized.class) public class ExponentiallyDecayingReservoirTest { + + public enum ReservoirFactory { + EXPONENTIALLY_DECAYING() { + @Override + Reservoir create(int size, double alpha, Clock clock) { + return new ExponentiallyDecayingReservoir(size, alpha, clock); + } + }, + + LOCK_FREE_EXPONENTIALLY_DECAYING() { + @Override + Reservoir create(int size, double alpha, Clock clock) { + return LockFreeExponentiallyDecayingReservoir.builder() + .size(size) + .alpha(alpha) + .clock(clock) + .build(); + } + }; + + abstract Reservoir create(int size, double alpha, Clock clock); + + Reservoir create(int size, double alpha) { + return create(size, alpha, Clock.defaultClock()); + } + } + + @Parameterized.Parameters(name = "{index}: {0}") + public static Collection reservoirs() { + return Arrays.stream(ReservoirFactory.values()) + .map(value -> new Object[] {value}) + .collect(Collectors.toList()); + } + + private final ReservoirFactory reservoirFactory; + + public ExponentiallyDecayingReservoirTest(ReservoirFactory reservoirFactory) { + this.reservoirFactory = reservoirFactory; + } + @Test public void aReservoirOf100OutOf1000Elements() { - final ExponentiallyDecayingReservoir reservoir = new ExponentiallyDecayingReservoir(100, 0.99); + final Reservoir reservoir = reservoirFactory.create(100, 0.99); for (int i = 0; i < 1000; i++) { reservoir.update(i); } @@ -30,7 +76,7 @@ public void aReservoirOf100OutOf1000Elements() { @Test public void aReservoirOf100OutOf10Elements() { - final ExponentiallyDecayingReservoir reservoir = new ExponentiallyDecayingReservoir(100, 0.99); + final Reservoir reservoir = reservoirFactory.create(100, 0.99); for (int i = 0; i < 10; i++) { reservoir.update(i); } @@ -48,7 +94,7 @@ public void aReservoirOf100OutOf10Elements() { @Test public void aHeavilyBiasedReservoirOf100OutOf1000Elements() { - final ExponentiallyDecayingReservoir reservoir = new ExponentiallyDecayingReservoir(1000, 0.01); + final Reservoir reservoir = reservoirFactory.create(1000, 0.01); for (int i = 0; i < 100; i++) { reservoir.update(i); } @@ -68,7 +114,7 @@ public void aHeavilyBiasedReservoirOf100OutOf1000Elements() { @Test public void longPeriodsOfInactivityShouldNotCorruptSamplingState() { final ManualClock clock = new ManualClock(); - final ExponentiallyDecayingReservoir reservoir = new ExponentiallyDecayingReservoir(10, 0.15, clock); + final Reservoir reservoir = reservoirFactory.create(10, 0.15, clock); // add 1000 values at a rate of 10 values/second for (int i = 0; i < 1000; i++) { @@ -102,7 +148,7 @@ public void longPeriodsOfInactivityShouldNotCorruptSamplingState() { @Test public void longPeriodsOfInactivity_fetchShouldResample() { final ManualClock clock = new ManualClock(); - final ExponentiallyDecayingReservoir reservoir = new ExponentiallyDecayingReservoir(10, + final Reservoir reservoir = reservoirFactory.create(10, 0.015, clock); @@ -128,7 +174,7 @@ public void longPeriodsOfInactivity_fetchShouldResample() { @Test public void emptyReservoirSnapshot_shouldReturnZeroForAllValues() { - final ExponentiallyDecayingReservoir reservoir = new ExponentiallyDecayingReservoir(100, 0.015, + final Reservoir reservoir = reservoirFactory.create(100, 0.015, new ManualClock()); Snapshot snapshot = reservoir.getSnapshot(); @@ -141,7 +187,7 @@ public void emptyReservoirSnapshot_shouldReturnZeroForAllValues() { @Test public void removeZeroWeightsInSamplesToPreventNaNInMeanValues() { final ManualClock clock = new ManualClock(); - final ExponentiallyDecayingReservoir reservoir = new ExponentiallyDecayingReservoir(1028, 0.015, clock); + final Reservoir reservoir = reservoirFactory.create(1028, 0.015, clock); Timer timer = new Timer(reservoir, clock); Context context = timer.time(); @@ -168,7 +214,7 @@ public void multipleUpdatesAfterlongPeriodsOfInactivityShouldNotCorruptSamplingS // Run the test several times. for (int attempt = 0; attempt < 10; attempt++) { final ManualClock clock = new ManualClock(); - final ExponentiallyDecayingReservoir reservoir = new ExponentiallyDecayingReservoir(10, + final Reservoir reservoir = reservoirFactory.create(10, 0.015, clock); @@ -253,7 +299,7 @@ public void multipleUpdatesAfterlongPeriodsOfInactivityShouldNotCorruptSamplingS @Test public void spotLift() { final ManualClock clock = new ManualClock(); - final ExponentiallyDecayingReservoir reservoir = new ExponentiallyDecayingReservoir(1000, + final Reservoir reservoir = reservoirFactory.create(1000, 0.015, clock); @@ -279,7 +325,7 @@ public void spotLift() { @Test public void spotFall() { final ManualClock clock = new ManualClock(); - final ExponentiallyDecayingReservoir reservoir = new ExponentiallyDecayingReservoir(1000, + final Reservoir reservoir = reservoirFactory.create(1000, 0.015, clock); @@ -305,7 +351,7 @@ public void spotFall() { @Test public void quantiliesShouldBeBasedOnWeights() { final ManualClock clock = new ManualClock(); - final ExponentiallyDecayingReservoir reservoir = new ExponentiallyDecayingReservoir(1000, + final Reservoir reservoir = reservoirFactory.create(1000, 0.015, clock); for (int i = 0; i < 40; i++) { @@ -340,9 +386,7 @@ public void clockWrapShouldNotRescale() { private void testShortPeriodShouldNotRescale(long startTimeNanos) { final ManualClock clock = new ManualClock(startTimeNanos); - final ExponentiallyDecayingReservoir reservoir = new ExponentiallyDecayingReservoir(10, - 1, - clock); + final Reservoir reservoir = reservoirFactory.create(10, 1, clock); reservoir.update(1000); assertThat(reservoir.getSnapshot().size()).isEqualTo(1); @@ -360,7 +404,7 @@ private void testShortPeriodShouldNotRescale(long startTimeNanos) { assertThat(snapshot.size()).isEqualTo(1); } - private static void assertAllValuesBetween(ExponentiallyDecayingReservoir reservoir, + private static void assertAllValuesBetween(Reservoir reservoir, double min, double max) { for (double i : reservoir.getSnapshot().getValues()) {