Skip to content

Commit

Permalink
fix: race condition in WindowedThroughput sampler (#73)
Browse files Browse the repository at this point in the history
## Which problem is this PR solving?

- A "flaky" test in Refinery would fail occasionally with a race
condition in WindowedSampler. This fixes it.

## Short description of the changes

- Use a local temporary variable instead of modifying the count on the
fly, then write it during the existing lock periods.
  • Loading branch information
kentquirk authored Jan 11, 2024
1 parent cca6d8c commit 7a8d666
Showing 1 changed file with 5 additions and 3 deletions.
8 changes: 5 additions & 3 deletions windowedthroughput.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,18 +151,19 @@ func (t *WindowedThroughput) updateMaps() {

// Apply the same aggregation algorithm as total throughput
// Short circuit if no traffic
t.numKeys = len(aggregateCounts)
if t.numKeys == 0 {
numKeys := len(aggregateCounts)
if numKeys == 0 {
// no traffic during the last period.
t.lock.Lock()
defer t.lock.Unlock()
t.numKeys = 0
t.savedSampleRates = make(map[string]int)
return
}
// figure out our target throughput per key over the lookback window.
totalGoalThroughput := t.GoalThroughputPerSec * t.LookbackFrequencyDuration.Seconds()
// floor the throughput but min should be 1 event per bucket per time period
throughputPerKey := math.Max(1, float64(totalGoalThroughput)/float64(t.numKeys))
throughputPerKey := math.Max(1, float64(totalGoalThroughput)/float64(numKeys))
// for each key, calculate sample rate by dividing counted events by the
// desired number of events
newSavedSampleRates := make(map[string]int)
Expand All @@ -174,6 +175,7 @@ func (t *WindowedThroughput) updateMaps() {
t.lock.Lock()
defer t.lock.Unlock()
t.savedSampleRates = newSavedSampleRates
t.numKeys = numKeys
}

// GetSampleRate takes a key and returns the appropriate sample rate for that
Expand Down

0 comments on commit 7a8d666

Please sign in to comment.