-
Notifications
You must be signed in to change notification settings - Fork 1.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement LockFreeExponentiallyDecayingReservoir #1656
Implement LockFreeExponentiallyDecayingReservoir #1656
Conversation
I'm not sure if this should have gone toward the 5.x branch instead, I defer to your judgement. This could be considered as an update to the existing |
ddd5e38
to
ea26b62
Compare
@carterkozak Thanks for this interesting contribution! Could you please rebase this PR on the In your opinion, would this PR replace #1638? |
Of course, I'd be happy to.
I believe so, however that PR replaces the ExponentiallyDecayingReservoir where this one adds a new Reservoir implementation -- I can update this design to replace the ExponentiallyDecayingReservoir with the exception of an |
This implementation has several advantages over the existing ExponentiallyDecayingReservoir: * It exclusively uses the precise system clock (nanotime/clock.tick) instead of a combination of nanotime and currentTimeMillis so it's not vulnerable to unexpected NTP clock jumps. * Lock free for substantially better performance under concurrent load[1] and improved performance in uncontended use[2] * Allows the rescale threshold to be configured programatically. Potential trade-offs: * Updates which occur concurrently with rescaling may be discarded if the orphaned state node is updated after rescale has replaced it. * In the worst case, all concurrent threads updating the reservoir may attempt to rescale rather than a single thread holding an exclusive write lock. It's expected that the configuration is set such that rescaling is substantially less common than updating at peak load. [1] substantially better performance under concurrent load 32 concurrent update threads ``` Benchmark (reservoirType) Mode Cnt Score Error Units ReservoirBenchmarks.updateReservoir EXPO_DECAY avgt 5 8235.861 ± 1306.404 ns/op ReservoirBenchmarks.updateReservoir LOCK_FREE_EXPO_DECAY avgt 5 758.315 ± 36.916 ns/op ``` [2] improved performance in uncontended use 1 benchmark thread ``` Benchmark (reservoirType) Mode Cnt Score Error Units ReservoirBenchmarks.updateReservoir EXPO_DECAY avgt 5 92.845 ± 36.478 ns/op ReservoirBenchmarks.updateReservoir LOCK_FREE_EXPO_DECAY avgt 5 49.168 ± 1.033 ns/op ```
ea26b62
to
74b54e1
Compare
private double weight(long durationNanos) { | ||
double durationSeconds = durationNanos / 1_000_000_000D; | ||
return Math.exp(alpha * durationSeconds); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note that this differs from the original ExponentiallyDecayingReservoir by avoiding a cliff every second. Nanosecond values are converted to seconds as a double for smooth scaling.
*/ | ||
State rescale(long newTick) { | ||
long durationNanos = newTick - startTick; | ||
double durationSeconds = durationNanos / 1_000_000_000D; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: precompute a factor to multiply with to avoid the division
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point, I can replace both instances of durationNanos / 1_000_000_000D
with durationNanos * 0.000000001
I agree this change could replace #1638 , they seem to solve the same problem. My only concern is that this solution might lose more data points during a rescale than in #1638 If this PR is merged, I can rebase my change on it and see if it still makes sense in some form. |
In both implementations we cannot guarantee events are not lost. Given that this is a sampling implementation I opted for simplicity in the
That is correct, by design (not to suggest my design is ideal 😄). We may spend more cycles rescaling under concurrent load, but we don't have to worry about failures leaving the reservoir in a degraded state. Rescale is both infrequent and inexpensive enough that I expect the cost to be negligible. Related thought (probably best for another PR if it's worthwhile at all): Most applications I work on create the vast majority of Histograms and Timers on startup, around the same time, and in the worst case have constant use. This results in a concurrent rescale of every reservoir at the same time, perhaps we should add some jitter to prevent load spikes? |
Avoid expensive division operations on each rescale and update
double priority = itemWeight / ThreadLocalRandom.current().nextDouble(); | ||
|
||
long newCount = count.incrementAndGet(); | ||
if (newCount <= size || values.isEmpty()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't understand why the isEmpty check is needed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
count.incrementAndGet()
is invoked prior to adding elements to the map. Let's assume a small maximum reservoir size (though the same thing can happen with high concurrency):
Max is one
Two threads call histogram.update(value)
Thread 1: incrementAndGet returns 1 so newCount <= size
is true, thread prepares to call values.put, but is descheduled (or thread 2 runs faster).
Thread 2: incrementAndGet returns 2 so newCount <= size
is false. values.firstKey()
returns null
and the comparison throws a NPE
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suppose this could be restructured to avoid the isEmpty
check and use a null check on first
instead. What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think firstKey can return null, it would throw exception if the map is empty
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe something like this should be used instead?
https://docs.oracle.com/javase/8/docs/api/java/util/NavigableMap.html#pollFirstEntry--
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tested using a slightly simplified implementation using the count, updated after a successful putIfAbsent:
private void update(long value, long timestampNanos) {
double itemWeight = weight(timestampNanos - startTick);
double priority = itemWeight / ThreadLocalRandom.current().nextDouble();
if (values.putIfAbsent(priority, new WeightedSample(value, itemWeight)) == null
&& count.incrementAndGet() > size) {
values.pollFirstEntry();
}
}
Performance suffers compared to the original implementation:
Benchmark (reservoirType) Mode Cnt Score Error Units
ReservoirBenchmarks.updateReservoir EXPO_DECAY avgt 5 8620.807 ± 543.252 ns/op
ReservoirBenchmarks.updateReservoir LOCK_FREE_EXPO_DECAY avgt 5 20105.586 ± 982.248 ns/op
I think this is due to removal of the fast path in which no write operation is attempted on the map when the new priority does not exceed the minimum priority in the map.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right, maybe it would be possible to keep the fast path but still keep the size invariant
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you're right that we can optimize this using pollFirstEntry. Might be helpful to move pollFirstEntry
into a small method to help escape analysis avoid an AbstractMap.SimpleImmutableEntry
allocation.
// Always remove an item
- while (values.remove(first) == null) {
- first = values.firstKey();
- }
+ values.pollFirstEntry();
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I made this branch: https://github.com/spkrka/metrics/tree/krka/poll-first-entry
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Figured out how to make a PR... https://github.com/carterkozak/metrics/pull/1/files
@carterkozak I made this commit with my idea: spkrka@08f16ac (Didn't manage to figure out how to make a PR for your branch - if you like it you can just take it, no need to preserve author) |
``` Benchmark (reservoirType) Mode Cnt Score Error Units ReservoirBenchmarks.updateReservoir EXPO_DECAY avgt 5 8365.308 ± 1880.683 ns/op ReservoirBenchmarks.updateReservoir LOCK_FREE_EXPO_DECAY avgt 5 73.966 ± 5.305 ns/op ```
metrics-core/src/main/java/com/codahale/metrics/LockFreeExponentiallyDecayingReservoir.java
Outdated
Show resolved
Hide resolved
I need to update the benchmark results. Single-threaded results have improved ~20% and concurrent load results have improved by an order of magnitude from ~700ns/op to ~70ns/op! |
This makes the check more likely to be optimized without forcing analysis of the less common doRescale path. Before: ``` private LockFreeExponentiallyDecayingReservoir$State rescaleIfNeeded(long) 0: aload_0 1: getfield dropwizard#56 // Field state:Lcom/codahale/metrics/LockFreeExponentiallyDecayingReservoir$State; 4: astore_3 5: aload_3 6: invokestatic dropwizard#81 // Method com/codahale/metrics/LockFreeExponentiallyDecayingReservoir$State.access$600:(Lcom/codahale/metrics/LockFreeExponentiallyDecayingReservoir$State;)J 9: lstore 4 11: lload_1 12: lload 4 14: lsub 15: aload_0 16: getfield dropwizard#36 // Field rescaleThresholdNanos:J 19: lcmp 20: iflt 51 23: aload_3 24: lload_1 25: invokevirtual dropwizard#85 // Method com/codahale/metrics/LockFreeExponentiallyDecayingReservoir$State.rescale:(J)Lcom/codahale/metrics/LockFreeExponentiallyDecayingReservoir$State; 28: astore 6 30: getstatic dropwizard#88 // Field stateUpdater:Ljava/util/concurrent/atomic/AtomicReferenceFieldUpdater; 33: aload_0 34: aload_3 35: aload 6 37: invokevirtual dropwizard#92 // Method java/util/concurrent/atomic/AtomicReferenceFieldUpdater.compareAndSet:(Ljava/lang/Object;Ljava/lang/Object;Ljava/lang/Object;)Z 40: ifeq 46 43: aload 6 45: areturn 46: aload_0 47: getfield dropwizard#56 // Field state:Lcom/codahale/metrics/LockFreeExponentiallyDecayingReservoir$State; 50: areturn 51: aload_3 52: areturn ``` After: ``` private LockFreeExponentiallyDecayingReservoir$State rescaleIfNeeded(long) 0: aload_0 1: getfield dropwizard#56 // Field state:Lcom/codahale/metrics/LockFreeExponentiallyDecayingReservoir$State; 4: astore_3 5: lload_1 6: aload_3 7: invokestatic dropwizard#81 // Method com/codahale/metrics/LockFreeExponentiallyDecayingReservoir$State.access$600:(Lcom/codahale/metrics/LockFreeExponentiallyDecayingReservoir$State;)J 10: lsub 11: aload_0 12: getfield dropwizard#36 // Field rescaleThresholdNanos:J 15: lcmp 16: iflt 26 19: aload_0 20: lload_1 21: aload_3 22: invokespecial dropwizard#85 // Method doRescale:(JLcom/codahale/metrics/LockFreeExponentiallyDecayingReservoir$State;)Lcom/codahale/metrics/LockFreeExponentiallyDecayingReservoir$State; 25: areturn 26: aload_3 27: areturn ```
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
had a couple nits while looking through
metrics-core/src/main/java/com/codahale/metrics/LockFreeExponentiallyDecayingReservoir.java
Show resolved
Hide resolved
metrics-core/src/main/java/com/codahale/metrics/LockFreeExponentiallyDecayingReservoir.java
Outdated
Show resolved
Hide resolved
``` Benchmark (reservoirType) Mode Cnt Score Error Units ReservoirBenchmarks.updateReservoir EXPO_DECAY avgt 5 8309.300 ± 1900.398 ns/op ReservoirBenchmarks.updateReservoir LOCK_FREE_EXPO_DECAY avgt 5 70.028 ± 0.887 ns/op ```
I've updated the benchmark results in the PR description. |
Latest set of changes from dropwizard/metrics#1656 Benchmark results with 32 threads: ``` Benchmark (reservoirType) Mode Cnt Score Error Units ReservoirBenchmarks.updateReservoir EXPO_DECAY avgt 5 8362.404 ± 1556.259 ns/op ReservoirBenchmarks.updateReservoir LOCK_FREE_EXPO_DECAY avgt 5 68.769 ± 1.325 ns/op ``` Benchmark results with a single thread: ``` Benchmark (reservoirType) Mode Cnt Score Error Units ReservoirBenchmarks.updateReservoir EXPO_DECAY avgt 5 82.767 ± 1.393 ns/op ReservoirBenchmarks.updateReservoir LOCK_FREE_EXPO_DECAY avgt 5 40.086 ± 0.330 ns/op ```
double itemWeight = weight(timestampNanos - startTick); | ||
double priority = itemWeight / ThreadLocalRandom.current().nextDouble(); | ||
boolean mapIsFull = count >= size; | ||
if (!mapIsFull || values.firstKey() < priority) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Two thoughts:
Maybe this should be split into two cases to avoid passing in bypassIncrement:
if (count < size) {
if (values.putIfAbsent(priority, new WeightedSample(value, itemWeight) == null) {
countUpdater.incrementAndGet(this);
}
} else if (values.firstKey() < priority) {
if (values.putIfAbsent(priority, new WeightedSample(value, itemWeight) == null) {
values.pollFirstEntry();
}
}
I think there might be a race condition here though (both before and after my suggestion).
Multiple threads can read count, see that the map is not full, add the element and then increment the counter, so the map becomes too big.
Then in subsequent writes, the map will remain at that size. Perhaps that's not a problem in practice.
I am not sure exactly how that can be fixed - perhaps you could remove more than one element if the map is too big and decrement the count? But that might lead to other race conditions instead.
Maybe this would work?
if (count < size) {
if (values.putIfAbsent(priority, new WeightedSample(value, itemWeight) == null) {
countUpdater.incrementAndGet(this);
}
} else if (values.firstKey() < priority) {
if (values.putIfAbsent(priority, new WeightedSample(value, itemWeight) == null) {
values.pollFirstEntry();
}
while (count > size) {
if (countUpdater.compareAndSet(this, count, count - 1)) {
values.pollFirstEntry();
}
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah no, that might not work...
Atomically sets the field of the given object managed by this updater to the given updated value if the current value == the expected value. This method is guaranteed to be atomic with respect to other calls to compareAndSet and set, but not necessarily with respect to other changes in the field.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can test with two addSample
implementations where the common path (map is already full) avoids dealing with the counter entirely. It's not clear if that would produce better performance, it's likely impacted by method size. I can try to keep update
and addSample*
below the 35 byte threshold to see what happens.
Multiple threads can read count, see that the map is not full, add the element and then increment the counter, so the map becomes too big.
I think this case is expected, the count
value may race to increment and exceed size
, however the size of the map doesn't exceed size
at steady state, only when new values are added prior to removing old values.
I think there is a race between the value replacement path and rescale in which rescale adds both the new and old element before an update can invoke values.pollFirstEntry()
. Fix should be easy enough by running pollFirstEntry until the rescaled state has an acceptable count.
We could update the count
increment to avoid exceeding size
, but I'm not sure it would buy us anything (and it would be difficult to take advantage of weak-CAS operations for architectures like ARM where CAS is more expensive):
/** Returns true if count was modified, false if the value is already equal to the maximum size. */
boolean incrementCount() {
// hoist size avoid multiple fetches, see -XX:+TrustFinalNonStaticFields
int max = this.size;
while (true) {
int countSnapshot = this.count;
if (countSnapshot < max) {
if (countUpdater.compareAndSet(this, countSnapshot, countSnapshot + 1)) {
return true;
} // else load a new countSnapshot and try again
} else {
return false;
}
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since we only need to call that method for the first size calls, it would not be a big performance issue over time.
However, perhaps it's not very important to avoid that the stable map size is sometimes larger than the maximum specified size.
I am still not sure it would be a bad idea to prefill the map with dummy values to avoid this problem. Then we could get rid of the count variable altogether (the size() method is part of the interface, but is never actually called in practice, so that could be implemented with an expensive scan of the map)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
However, perhaps it's not very important to avoid that the stable map size is sometimes larger than the maximum specified size.
when we use the increment (which can racily increment to exceed size
), we use the result via the countUpdater.incrementAndGet(this) > size
check to reduce the size of the map again, so the map itself doesn't have a race, there's just a chance that the count
value may be incremented beyond size
.
I am still not sure it would be a bad idea to prefill the map with dummy values to avoid this problem. Then we could get rid of the count variable altogether (the size() method is part of the interface, but is never actually called in practice, so that could be implemented with an expensive scan of the map)
If there's a performance benefit, I'm happy to try it. I'm worried because there are a couple buckets of common reservoir uses:
- Metric on a relatively hot path: In this case the reservoir is filled quickly and generally follows all the fast paths (most updates are ignored due to low priority)
- Metric is created and never updated (feature flagged off code path)
In the second category, creating reservoir maps with 1028 (default) nodes is relatively expensive on heap. I think we would want a strong performance argument to justify the cost. What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this PR is good as it is, can't see any real practical issues with it.
Creating a map with 1028 entries with different double keys but all pointing to the same value instance doesn't seem too expensive to me, but I don't mind keeping it like this. I just have a slight preference to optimize performance and code readability for the high traffic use case (where the performance is more important anyway)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll generate some data so we can make an informed decision :-)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using the Intellij memory agent included in the idea IDE, I ran a simple main method in the debugger:
public static void main(String[] _args) {
Reservoir reservoir = LockFreeExponentiallyDecayingReservoir.builder().build();
System.out.println("Breakpoint 1");
// Additional samples to avoid any potential jitter from unlikely priority collision
for (int i = 0; i < 5000; i++) {
reservoir.update(1L);
}
System.out.println("Breakpoint 2");
}
At Breakpoint 1
the reservoir has a retained size of 120 bytes.
At Breakpoint 2
the reservoir has a retained size of 94.97 kb (Really looking forward to valhalla to reduce this!)
We can optimize (as you have demonstrated) using a singleton WeightedSample
instead of creating size
new instances, which brings us down to 63.94 kB per unused reservoir.
We can further optimize by pre-calculating boxed doubles (assuming the reservoir uses our default or fewer samples) private static final Double[] VALUES = IntStream.range(0, 1028).mapToObj(value -> -1D - value).toArray(Double[]::new);
bringing retained heap to 36.61 kB per unused reservoir based on internal nodes used by the ConcurrentSkipListMap.
That's a significant chunk of heap! Perhaps I can measure it with a pre-release valhalla jdk later today for comparison :-)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good analysis! I think we can skip that for now. Perhaps I can try to make it configurable later in some way so people have a choice of using a less memory intensive version for reservoirs with very few samples or one that may be faster for fully filled reservoirs.
As I said before, I think the PR is good to go (IMO)
Results with 32 concurrent threads on a 14c/28t Xeon W-2175 ``` Benchmark Mode Cnt Score Error Units ReservoirBenchmark.perfExponentiallyDecayingReservoir avgt 4 8.817 ± 0.310 us/op ReservoirBenchmark.perfLockFreeExponentiallyDecayingReservoir avgt 4 0.076 ± 0.002 us/op ReservoirBenchmark.perfSlidingTimeWindowArrayReservoir avgt 4 14.890 ± 0.489 us/op ReservoirBenchmark.perfSlidingTimeWindowReservoir avgt 4 39.066 ± 27.583 us/op ReservoirBenchmark.perfSlidingWindowReservoir avgt 4 4.257 ± 0.187 us/op ReservoirBenchmark.perfUniformReservoir avgt 4 0.704 ± 0.040 us/op ```
metrics-benchmarks/src/main/java/com/codahale/metrics/benchmarks/ReservoirBenchmark.java
Outdated
Show resolved
Hide resolved
…ks/ReservoirBenchmark.java
Code taken from dropwizard/metrics#1656 with permission from the author. See: #85 Replace this with the class from the dropwizard dependency once it is has been released.
Code taken from dropwizard/metrics#1656 with permission from the author. See: #85 Replace this with the class from the dropwizard dependency once it is has been released.
Code taken from dropwizard/metrics#1656 with permission from the author. See: #85 Replace this with the class from the dropwizard dependency once it is has been released.
Code taken from dropwizard/metrics#1656 with permission from the author. See: #85 Replace this with the class from the dropwizard dependency once it is has been released.
Code taken from dropwizard/metrics#1656 with permission from the author. See: #85 Replace this with the class from the dropwizard dependency once it is has been released.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@carterkozak @schlosna @spkrka Thank you very, very much for this contribution and the subsequent discussion and review! ❤️
The PR looks good to me and I'm going to finally merge it. 😉
Would it be possible to also get this into 4.1.x and get a release? No urgency from my side, would just be nice to clean up some manual inlining on my end. |
@spkrka I'd like to keep it in Metrics 4.2.x. This being said, we'll probably publish a first beta of Metrics 4.2.0 soon. |
This implementation has several advantages over the existing
ExponentiallyDecayingReservoir:
instead of a combination of nanotime and currentTimeMillis so it's
not vulnerable to unexpected NTP clock jumps.
load[1] and improved performance in uncontended use[2]
Potential trade-offs:
the orphaned state node is updated after rescale has replaced it.
attempt to rescale rather than a single thread holding an exclusive
write lock. It's expected that the configuration is set such that
rescaling is substantially less common than updating at peak load.
[1] substantially better performance under concurrent load
32 concurrent update threads
[2] improved performance in uncontended use
1 benchmark thread