Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement LockFreeExponentiallyDecayingReservoir #1656

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package com.codahale.metrics.benchmarks;

import com.codahale.metrics.ExponentiallyDecayingReservoir;
import com.codahale.metrics.LockFreeExponentiallyDecayingReservoir;
import com.codahale.metrics.Reservoir;
import com.codahale.metrics.SlidingTimeWindowArrayReservoir;
import com.codahale.metrics.SlidingTimeWindowReservoir;
import com.codahale.metrics.SlidingWindowReservoir;
Expand All @@ -23,6 +25,7 @@ public class ReservoirBenchmark {

private final UniformReservoir uniform = new UniformReservoir();
private final ExponentiallyDecayingReservoir exponential = new ExponentiallyDecayingReservoir();
private final Reservoir lockFreeExponential = LockFreeExponentiallyDecayingReservoir.builder().build();
private final SlidingWindowReservoir sliding = new SlidingWindowReservoir(1000);
private final SlidingTimeWindowReservoir slidingTime = new SlidingTimeWindowReservoir(200, TimeUnit.MILLISECONDS);
private final SlidingTimeWindowArrayReservoir arrTime = new SlidingTimeWindowArrayReservoir(200, TimeUnit.MILLISECONDS);
Expand Down Expand Up @@ -60,6 +63,12 @@ public Object perfSlidingTimeWindowReservoir() {
return slidingTime;
}

@Benchmark
public Object perfLockFreeExponentiallyDecayingReservoir() {
lockFreeExponential.update(nextValue);
return lockFreeExponential;
}

public static void main(String[] args) throws RunnerException {
Options opt = new OptionsBuilder()
.include(".*" + ReservoirBenchmark.class.getSimpleName() + ".*")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,270 @@
package com.codahale.metrics;

import com.codahale.metrics.WeightedSnapshot.WeightedSample;

import java.time.Duration;
import java.util.Objects;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.BiConsumer;

/**
* A lock-free exponentially-decaying random reservoir of {@code long}s. Uses Cormode et al's
* forward-decaying priority reservoir sampling method to produce a statistically representative
* sampling reservoir, exponentially biased towards newer entries.
*
* @see <a href="http://dimacs.rutgers.edu/~graham/pubs/papers/fwddecay.pdf">
* Cormode et al. Forward Decay: A Practical Time Decay Model for Streaming Systems. ICDE '09:
* Proceedings of the 2009 IEEE International Conference on Data Engineering (2009)</a>
*
* {@link LockFreeExponentiallyDecayingReservoir} is based closely on the {@link ExponentiallyDecayingReservoir},
* however it provides looser guarantees while completely avoiding locks.
*
* Looser guarantees:
* <ul>
* <li> Updates which occur concurrently with rescaling may be discarded if the orphaned state node is updated after
* rescale has replaced it. This condition has a greater probability as the rescale interval is reduced due to the
* increased frequency of rescaling. {@link #rescaleThresholdNanos} values below 30 seconds are not recommended.
* <li> Given a small rescale threshold, updates may attempt to rescale into a new bucket, but lose the CAS race
* and update into a newer bucket than expected. In these cases the measurement weight is reduced accordingly.
* <li>In the worst case, all concurrent threads updating the reservoir may attempt to rescale rather than
* a single thread holding an exclusive write lock. It's expected that the configuration is set such that
* rescaling is substantially less common than updating at peak load. Even so, when size is reasonably small
* it can be more efficient to rescale than to park and context switch.
* </ul>
*
* @author <a href="mailto:ckozak@ckozak.net">Carter Kozak</a>
*/
public final class LockFreeExponentiallyDecayingReservoir implements Reservoir {

private static final double SECONDS_PER_NANO = .000_000_001D;
private static final AtomicReferenceFieldUpdater<LockFreeExponentiallyDecayingReservoir, State> stateUpdater =
AtomicReferenceFieldUpdater.newUpdater(LockFreeExponentiallyDecayingReservoir.class, State.class, "state");

private final int size;
private final long rescaleThresholdNanos;
private final Clock clock;

private volatile State state;

private static final class State {

private static final AtomicIntegerFieldUpdater<State> countUpdater =
AtomicIntegerFieldUpdater.newUpdater(State.class, "count");

private final double alphaNanos;
private final int size;
private final long startTick;
// Count is updated after samples are successfully added to the map.
private final ConcurrentSkipListMap<Double, WeightedSample> values;

private volatile int count;

State(
double alphaNanos,
int size,
long startTick,
int count,
ConcurrentSkipListMap<Double, WeightedSample> values) {
this.alphaNanos = alphaNanos;
this.size = size;
this.startTick = startTick;
this.values = values;
this.count = count;
}

private void update(long value, long timestampNanos) {
double itemWeight = weight(timestampNanos - startTick);
double priority = itemWeight / ThreadLocalRandom.current().nextDouble();
boolean mapIsFull = count >= size;
if (!mapIsFull || values.firstKey() < priority) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Two thoughts:

Maybe this should be split into two cases to avoid passing in bypassIncrement:

if (count < size) {
  if (values.putIfAbsent(priority, new WeightedSample(value, itemWeight) == null) {
    countUpdater.incrementAndGet(this);
  }
} else if (values.firstKey() < priority) {
  if (values.putIfAbsent(priority, new WeightedSample(value, itemWeight) == null) {
    values.pollFirstEntry();
  }
}

I think there might be a race condition here though (both before and after my suggestion).
Multiple threads can read count, see that the map is not full, add the element and then increment the counter, so the map becomes too big.
Then in subsequent writes, the map will remain at that size. Perhaps that's not a problem in practice.

I am not sure exactly how that can be fixed - perhaps you could remove more than one element if the map is too big and decrement the count? But that might lead to other race conditions instead.

Maybe this would work?

if (count < size) {
  if (values.putIfAbsent(priority, new WeightedSample(value, itemWeight) == null) {
    countUpdater.incrementAndGet(this);
  }
} else if (values.firstKey() < priority) {
  if (values.putIfAbsent(priority, new WeightedSample(value, itemWeight) == null) {
    values.pollFirstEntry();
  }
  while (count > size) {
    if (countUpdater.compareAndSet(this, count, count - 1)) {
      values.pollFirstEntry();
    }
  }
}

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah no, that might not work...

Atomically sets the field of the given object managed by this updater to the given updated value if the current value == the expected value. This method is guaranteed to be atomic with respect to other calls to compareAndSet and set, but not necessarily with respect to other changes in the field.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can test with two addSample implementations where the common path (map is already full) avoids dealing with the counter entirely. It's not clear if that would produce better performance, it's likely impacted by method size. I can try to keep update and addSample* below the 35 byte threshold to see what happens.

Multiple threads can read count, see that the map is not full, add the element and then increment the counter, so the map becomes too big.

I think this case is expected, the count value may race to increment and exceed size, however the size of the map doesn't exceed size at steady state, only when new values are added prior to removing old values.
I think there is a race between the value replacement path and rescale in which rescale adds both the new and old element before an update can invoke values.pollFirstEntry(). Fix should be easy enough by running pollFirstEntry until the rescaled state has an acceptable count.

We could update the count increment to avoid exceeding size, but I'm not sure it would buy us anything (and it would be difficult to take advantage of weak-CAS operations for architectures like ARM where CAS is more expensive):

/** Returns true if count was modified, false if the value is already equal to the maximum size. */
boolean incrementCount() {
  // hoist size avoid multiple fetches, see -XX:+TrustFinalNonStaticFields
  int max = this.size;
  while (true) {
    int countSnapshot = this.count;
    if (countSnapshot < max) {
      if (countUpdater.compareAndSet(this, countSnapshot, countSnapshot + 1)) {
        return true;
      } // else load a new countSnapshot and try again
    } else {
      return false;
    }
  }
}

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we only need to call that method for the first size calls, it would not be a big performance issue over time.

However, perhaps it's not very important to avoid that the stable map size is sometimes larger than the maximum specified size.

I am still not sure it would be a bad idea to prefill the map with dummy values to avoid this problem. Then we could get rid of the count variable altogether (the size() method is part of the interface, but is never actually called in practice, so that could be implemented with an expensive scan of the map)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

However, perhaps it's not very important to avoid that the stable map size is sometimes larger than the maximum specified size.

when we use the increment (which can racily increment to exceed size), we use the result via the countUpdater.incrementAndGet(this) > size check to reduce the size of the map again, so the map itself doesn't have a race, there's just a chance that the count value may be incremented beyond size.

I am still not sure it would be a bad idea to prefill the map with dummy values to avoid this problem. Then we could get rid of the count variable altogether (the size() method is part of the interface, but is never actually called in practice, so that could be implemented with an expensive scan of the map)

If there's a performance benefit, I'm happy to try it. I'm worried because there are a couple buckets of common reservoir uses:

  1. Metric on a relatively hot path: In this case the reservoir is filled quickly and generally follows all the fast paths (most updates are ignored due to low priority)
  2. Metric is created and never updated (feature flagged off code path)

In the second category, creating reservoir maps with 1028 (default) nodes is relatively expensive on heap. I think we would want a strong performance argument to justify the cost. What do you think?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this PR is good as it is, can't see any real practical issues with it.

Creating a map with 1028 entries with different double keys but all pointing to the same value instance doesn't seem too expensive to me, but I don't mind keeping it like this. I just have a slight preference to optimize performance and code readability for the high traffic use case (where the performance is more important anyway)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll generate some data so we can make an informed decision :-)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using the Intellij memory agent included in the idea IDE, I ran a simple main method in the debugger:

    public static void main(String[] _args) {
        Reservoir reservoir = LockFreeExponentiallyDecayingReservoir.builder().build();
        System.out.println("Breakpoint 1");
        // Additional samples to avoid any potential jitter from unlikely priority collision
        for (int i = 0; i < 5000; i++) {
            reservoir.update(1L);
        }
        System.out.println("Breakpoint 2");
    }

At Breakpoint 1 the reservoir has a retained size of 120 bytes.
At Breakpoint 2 the reservoir has a retained size of 94.97 kb (Really looking forward to valhalla to reduce this!)

We can optimize (as you have demonstrated) using a singleton WeightedSample instead of creating size new instances, which brings us down to 63.94 kB per unused reservoir.
We can further optimize by pre-calculating boxed doubles (assuming the reservoir uses our default or fewer samples) private static final Double[] VALUES = IntStream.range(0, 1028).mapToObj(value -> -1D - value).toArray(Double[]::new); bringing retained heap to 36.61 kB per unused reservoir based on internal nodes used by the ConcurrentSkipListMap.

That's a significant chunk of heap! Perhaps I can measure it with a pre-release valhalla jdk later today for comparison :-)

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good analysis! I think we can skip that for now. Perhaps I can try to make it configurable later in some way so people have a choice of using a less memory intensive version for reservoirs with very few samples or one that may be faster for fully filled reservoirs.

As I said before, I think the PR is good to go (IMO)

addSample(priority, value, itemWeight, mapIsFull);
}
}

private void addSample(double priority, long value, double itemWeight, boolean bypassIncrement) {
if (values.putIfAbsent(priority, new WeightedSample(value, itemWeight)) == null
&& (bypassIncrement || countUpdater.incrementAndGet(this) > size)) {
values.pollFirstEntry();
}
}

/* "A common feature of the above techniques—indeed, the key technique that
* allows us to track the decayed weights efficiently—is that they maintain
* counts and other quantities based on g(ti − L), and only scale by g(t − L)
* at query time. But while g(ti −L)/g(t−L) is guaranteed to lie between zero
* and one, the intermediate values of g(ti − L) could become very large. For
* polynomial functions, these values should not grow too large, and should be
* effectively represented in practice by floating point values without loss of
* precision. For exponential functions, these values could grow quite large as
* new values of (ti − L) become large, and potentially exceed the capacity of
* common floating point types. However, since the values stored by the
* algorithms are linear combinations of g values (scaled sums), they can be
* rescaled relative to a new landmark. That is, by the analysis of exponential
* decay in Section III-A, the choice of L does not affect the final result. We
* can therefore multiply each value based on L by a factor of exp(−α(L′ − L)),
* and obtain the correct value as if we had instead computed relative to a new
* landmark L′ (and then use this new L′ at query time). This can be done with
* a linear pass over whatever data structure is being used."
*/
State rescale(long newTick) {
long durationNanos = newTick - startTick;
double scalingFactor = Math.exp(-alphaNanos * durationNanos);
int newCount = 0;
ConcurrentSkipListMap<Double, WeightedSample> newValues = new ConcurrentSkipListMap<>();
if (Double.compare(scalingFactor, 0) != 0) {
RescalingConsumer consumer = new RescalingConsumer(scalingFactor, newValues);
values.forEach(consumer);
// make sure the counter is in sync with the number of stored samples.
newCount = consumer.count;
}
// It's possible that more values were added while the map was scanned, those with the
// minimum priorities are removed.
while (newCount > size) {
Objects.requireNonNull(newValues.pollFirstEntry(), "Expected an entry");
newCount--;
}
return new State(alphaNanos, size, newTick, newCount, newValues);
}

private double weight(long durationNanos) {
return Math.exp(alphaNanos * durationNanos);
}
}

private static final class RescalingConsumer implements BiConsumer<Double, WeightedSample> {
private final double scalingFactor;
private final ConcurrentSkipListMap<Double, WeightedSample> values;
private int count;

RescalingConsumer(double scalingFactor, ConcurrentSkipListMap<Double, WeightedSample> values) {
this.scalingFactor = scalingFactor;
this.values = values;
}

@Override
public void accept(Double priority, WeightedSample sample) {
double newWeight = sample.weight * scalingFactor;
if (Double.compare(newWeight, 0) == 0) {
return;
}
WeightedSample newSample = new WeightedSample(sample.value, newWeight);
if (values.put(priority * scalingFactor, newSample) == null) {
count++;
}
}
}

private LockFreeExponentiallyDecayingReservoir(int size, double alpha, Duration rescaleThreshold, Clock clock) {
// Scale alpha to nanoseconds
double alphaNanos = alpha * SECONDS_PER_NANO;
this.size = size;
this.clock = clock;
this.rescaleThresholdNanos = rescaleThreshold.toNanos();
this.state = new State(alphaNanos, size, clock.getTick(), 0, new ConcurrentSkipListMap<>());
}

@Override
public int size() {
return Math.min(size, state.count);
}

@Override
public void update(long value) {
long now = clock.getTick();
rescaleIfNeeded(now).update(value, now);
}

private State rescaleIfNeeded(long currentTick) {
// This method is optimized for size so the check may be quickly inlined.
// Rescaling occurs substantially less frequently than the check itself.
State stateSnapshot = this.state;
if (currentTick - stateSnapshot.startTick >= rescaleThresholdNanos) {
return doRescale(currentTick, stateSnapshot);
}
return stateSnapshot;
}

private State doRescale(long currentTick, State stateSnapshot) {
State newState = stateSnapshot.rescale(currentTick);
if (stateUpdater.compareAndSet(this, stateSnapshot, newState)) {
// newState successfully installed
return newState;
}
// Otherwise another thread has won the race and we can return the result of a volatile read.
// It's possible this has taken so long that another update is required, however that's unlikely
// and no worse than the standard race between a rescale and update.
return this.state;
}

@Override
public Snapshot getSnapshot() {
State stateSnapshot = rescaleIfNeeded(clock.getTick());
return new WeightedSnapshot(stateSnapshot.values.values());
}

public static Builder builder() {
return new Builder();
}

/**
* By default this uses a size of 1028 elements, which offers a 99.9%
* confidence level with a 5% margin of error assuming a normal distribution, and an alpha
* factor of 0.015, which heavily biases the reservoir to the past 5 minutes of measurements.
*/
public static final class Builder {
private static final int DEFAULT_SIZE = 1028;
private static final double DEFAULT_ALPHA = 0.015D;
private static final Duration DEFAULT_RESCALE_THRESHOLD = Duration.ofHours(1);

private int size = DEFAULT_SIZE;
private double alpha = DEFAULT_ALPHA;
private Duration rescaleThreshold = DEFAULT_RESCALE_THRESHOLD;
private Clock clock = Clock.defaultClock();

private Builder() {}

/**
* Maximum number of samples to keep in the reservoir. Once this number is reached older samples are
* replaced (based on weight, with some amount of random jitter).
*/
public Builder size(int value) {
if (value <= 0) {
throw new IllegalArgumentException(
"LockFreeExponentiallyDecayingReservoir size must be positive: " + value);
}
this.size = value;
carterkozak marked this conversation as resolved.
Show resolved Hide resolved
return this;
}

/**
* Alpha is the exponential decay factor. Higher values bias results more heavily toward newer values.
*/
public Builder alpha(double value) {
this.alpha = value;
return this;
}

/**
* Interval at which this reservoir is rescaled.
*/
public Builder rescaleThreshold(Duration value) {
this.rescaleThreshold = Objects.requireNonNull(value, "rescaleThreshold is required");
return this;
}

/**
* Clock instance used for decay.
*/
public Builder clock(Clock value) {
this.clock = Objects.requireNonNull(value, "clock is required");
return this;
}

public Reservoir build() {
return new LockFreeExponentiallyDecayingReservoir(size, alpha, rescaleThreshold, clock);
}
}
}
Loading