Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Krka/poll first entry #1

Open
wants to merge 1 commit into
base: ckozak/lock_free_exponential_decaying_reservoir
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,15 @@
import com.codahale.metrics.WeightedSnapshot.WeightedSample;

import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;

/**
* A lock-free exponentially-decaying random reservoir of {@code long}s. Uses Cormode et al's
Expand Down Expand Up @@ -40,28 +43,26 @@
public final class LockFreeExponentiallyDecayingReservoir implements Reservoir {

private static final double SECONDS_PER_NANO = .000_000_001D;

private static final WeightedSample DUMMY = new WeightedSample(0, 0);
private static final List<Double> DUMMY_KEYS = new ArrayList<>();

private static final AtomicReferenceFieldUpdater<LockFreeExponentiallyDecayingReservoir, State> stateUpdater =
AtomicReferenceFieldUpdater.newUpdater(LockFreeExponentiallyDecayingReservoir.class, State.class, "state");

private final int size;
private final long rescaleThresholdNanos;
private final Clock clock;

private volatile State state;

private static final class State {

private static final AtomicIntegerFieldUpdater<State> countUpdater =
AtomicIntegerFieldUpdater.newUpdater(State.class, "count");

private final double alphaNanos;
private final int size;
private final long startTick;
// Count is updated after samples are successfully added to the map.
private final ConcurrentSkipListMap<Double, WeightedSample> values;

private volatile int count;

State(
double alphaNanos,
int size,
Expand All @@ -72,21 +73,26 @@ private static final class State {
this.size = size;
this.startTick = startTick;
this.values = values;
this.count = count;

final int toAdd = size - count;
if (DUMMY_KEYS.size() < toAdd) {
ensureDummyKeySize(toAdd);
}
for (int i = 0; i < toAdd; i++) {
values.put(DUMMY_KEYS.get(i), DUMMY);
}
}

private void update(long value, long timestampNanos) {
double itemWeight = weight(timestampNanos - startTick);
double priority = itemWeight / ThreadLocalRandom.current().nextDouble();
boolean mapIsFull = count >= size;
if (!mapIsFull || values.firstKey() < priority) {
addSample(priority, value, itemWeight, mapIsFull);
if (values.firstKey() < priority) {
addSample(priority, value, itemWeight);
}
}

private void addSample(double priority, long value, double itemWeight, boolean bypassIncrement) {
if (values.putIfAbsent(priority, new WeightedSample(value, itemWeight)) == null
&& (bypassIncrement || countUpdater.incrementAndGet(this) > size)) {
private void addSample(double priority, long value, double itemWeight) {
if (values.putIfAbsent(priority, new WeightedSample(value, itemWeight)) == null) {
values.pollFirstEntry();
}
}
Expand Down Expand Up @@ -134,6 +140,16 @@ private double weight(long durationNanos) {
}
}

private static void ensureDummyKeySize(int wantedCapacity) {
synchronized (DUMMY_KEYS) {
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This synchronization should only trigger on the first creation

int curSize = DUMMY_KEYS.size();
while (curSize < wantedCapacity) {
DUMMY_KEYS.add(-1.0 - curSize);
curSize++;
}
}
}

private static final class RescalingConsumer implements BiConsumer<Double, WeightedSample> {
private final double scalingFactor;
private final ConcurrentSkipListMap<Double, WeightedSample> values;
Expand All @@ -147,7 +163,7 @@ private static final class RescalingConsumer implements BiConsumer<Double, Weigh
@Override
public void accept(Double priority, WeightedSample sample) {
double newWeight = sample.weight * scalingFactor;
if (Double.compare(newWeight, 0) == 0) {
if (Double.compare(newWeight, 0) <= 0) {
return;
}
WeightedSample newSample = new WeightedSample(sample.value, newWeight);
Expand All @@ -160,15 +176,17 @@ public void accept(Double priority, WeightedSample sample) {
private LockFreeExponentiallyDecayingReservoir(int size, double alpha, Duration rescaleThreshold, Clock clock) {
// Scale alpha to nanoseconds
double alphaNanos = alpha * SECONDS_PER_NANO;
this.size = size;
this.clock = clock;
this.rescaleThresholdNanos = rescaleThreshold.toNanos();
this.state = new State(alphaNanos, size, clock.getTick(), 0, new ConcurrentSkipListMap<>());
}

@Override
public int size() {
return Math.min(size, state.count);
// This is never called in practice, so it does not need to be fast
return (int) state.values.keySet().stream()
.filter(prio -> prio > 0)
.count();
}

@Override
Expand Down Expand Up @@ -202,7 +220,11 @@ private State doRescale(long currentTick, State stateSnapshot) {
@Override
public Snapshot getSnapshot() {
State stateSnapshot = rescaleIfNeeded(clock.getTick());
return new WeightedSnapshot(stateSnapshot.values.values());
final List<WeightedSample> samples = stateSnapshot.values.entrySet().stream()
.filter(e -> e.getKey() > 0)
.map(Map.Entry::getValue)
.collect(Collectors.toList());
return new WeightedSnapshot(samples);
}

public static Builder builder() {
Expand Down