-
Notifications
You must be signed in to change notification settings - Fork 10
/
Copy pathemathroughput.go
379 lines (335 loc) · 12.3 KB
/
emathroughput.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
package dynsampler
import (
"encoding/json"
"errors"
"fmt"
"math"
"sync"
"time"
)
// EMAThroughput implements Sampler and attempts to achieve a given throughput
// rate, weighting rare traffic and frequent traffic differently so as to end up
// with the the desired throughput.
//
// Based on the EMASampleRate implementation, EMAThroughput differs in that
// instead of trying to achieve a given sample rate, it tries to reach a given
// throughput of events. During bursts of traffic, it will reduce sample
// rates so as to keep the number of events per second roughly constant.
//
// Like the EMA sampler, it maintains an Exponential Moving Average of counts
// seen per key, and adjusts this average at regular intervals. The weight
// applied to more recent intervals is defined by `weight`, a number between (0,
// 1) - larger values weight the average more toward recent observations. In
// other words, a larger weight will cause sample rates to more quickly adapt to
// traffic patterns, while a smaller weight will result in sample rates that are
// less sensitive to bursts or drops in traffic and thus more consistent over
// time.
//
// New keys that are not found in the EMA will always have a sample
// rate of 1. Keys that occur more frequently will be sampled on a logarithmic
// curve. In other words, every key will be represented at least once in any
// given window and more frequent keys will have their sample rate
// increased proportionally to wind up with the goal throughput.
type EMAThroughput struct {
// AdjustmentInterval defines how often we adjust the moving average from
// recent observations. Default 15s.
AdjustmentInterval time.Duration
// Weight is a value between (0, 1) indicating the weighting factor used to adjust
// the EMA. With larger values, newer data will influence the average more, and older
// values will be factored out more quickly. In mathematical literature concerning EMA,
// this is referred to as the `alpha` constant.
// Default is 0.5
Weight float64
// InitialSampleRate is the sample rate to use during startup, before we
// have accumulated enough data to calculate a reasonable desired sample
// rate. This is mainly useful in situations where unsampled throughput is
// high enough to cause problems.
// Default 10.
InitialSampleRate int
// GoalThroughputPerSec is the target number of events to send per second.
// Sample rates are generated to squash the total throughput down to match the
// goal throughput. Actual throughput may exceed goal throughput. default 100
GoalThroughputPerSec int
// MaxKeys, if greater than 0, limits the number of distinct keys tracked in EMA.
// Once MaxKeys is reached, new keys will not be included in the sample rate map, but
// existing keys will continue to be be counted.
// Defaults to 0
MaxKeys int
// AgeOutValue indicates the threshold for removing keys from the EMA. The EMA of any key will approach 0
// if it is not repeatedly observed, but will never truly reach it, so we have to decide what constitutes "zero".
// Keys with averages below this threshold will be removed from the EMA. Default is the same as Weight, as this prevents
// a key with the smallest integer value (1) from being aged out immediately. This value should generally be <= Weight,
// unless you have very specific reasons to set it higher.
AgeOutValue float64
// BurstMultiple, if set, is multiplied by the sum of the running average of counts to define
// the burst detection threshold. If total counts observed for a given interval exceed the threshold
// EMA is updated immediately, rather than waiting on the AdjustmentInterval.
// Defaults to 2; negative value disables. With a default of 2, if your traffic suddenly doubles,
// burst detection will kick in.
BurstMultiple float64
// BurstDetectionDelay indicates the number of intervals to run after Start is called before burst detection kicks in.
// Defaults to 3
BurstDetectionDelay uint
savedSampleRates map[string]int
currentCounts map[string]float64
movingAverage map[string]float64
burstThreshold float64
currentBurstSum float64
intervalCount uint
burstSignal chan struct{}
// haveData indicates that we have gotten a sample of traffic. Before we've
// gotten any samples of traffic, we should use the default goal sample rate
// for all events instead of sampling everything at 1
haveData bool
updating bool
done chan struct{}
lock sync.Mutex
// used only in tests
testSignalMapsDone chan struct{}
// metrics
requestCount int64
eventCount int64
burstCount int64
}
// Ensure we implement the sampler interface
var _ Sampler = (*EMAThroughput)(nil)
func (e *EMAThroughput) Start() error {
// apply defaults
if e.AdjustmentInterval == 0 {
e.AdjustmentInterval = 15 * time.Second
}
if e.AdjustmentInterval < 1*time.Millisecond {
return fmt.Errorf("the AdjustmentInterval %v is unreasonably short for a throughput sampler", e.AdjustmentInterval)
}
if e.InitialSampleRate == 0 {
e.InitialSampleRate = 10
}
if e.GoalThroughputPerSec == 0 {
e.GoalThroughputPerSec = 100
}
if e.Weight == 0 {
e.Weight = 0.5
}
if e.AgeOutValue == 0 {
e.AgeOutValue = e.Weight
}
if e.BurstMultiple == 0 {
e.BurstMultiple = 2
}
if e.BurstDetectionDelay == 0 {
e.BurstDetectionDelay = 3
}
// Don't override these maps at startup in case they were loaded from a previous state
e.currentCounts = make(map[string]float64)
if e.savedSampleRates == nil {
e.savedSampleRates = make(map[string]int)
}
if e.movingAverage == nil {
e.movingAverage = make(map[string]float64)
}
e.burstSignal = make(chan struct{})
e.done = make(chan struct{})
go func() {
ticker := time.NewTicker(e.AdjustmentInterval)
defer ticker.Stop()
for {
select {
case <-e.burstSignal:
// reset ticker when we get a burst
ticker.Stop()
ticker = time.NewTicker(e.AdjustmentInterval)
e.updateMaps()
case <-ticker.C:
e.updateMaps()
e.intervalCount++
case <-e.done:
return
}
}
}()
return nil
}
func (e *EMAThroughput) Stop() error {
close(e.done)
return nil
}
// updateMaps calculates a new saved rate map based on the contents of the
// counter map
func (e *EMAThroughput) updateMaps() {
e.lock.Lock()
if e.testSignalMapsDone != nil {
defer func() {
e.testSignalMapsDone <- struct{}{}
}()
}
// short circuit if no traffic
if len(e.currentCounts) == 0 {
// No traffic the last interval, don't update anything. This is deliberate to avoid
// the average decaying when there's no traffic (comes in bursts, or there's some kind of outage).
e.lock.Unlock()
return
}
// If there is another updateMaps going, bail
if e.updating {
e.lock.Unlock()
return
}
e.updating = true
// make a local copy of the sample counters for calculation
tmpCounts := e.currentCounts
e.currentCounts = make(map[string]float64)
e.currentBurstSum = 0
e.lock.Unlock()
e.updateEMA(tmpCounts)
// Goal events to send this interval is the total count of events in the EMA
// divided by the desired average sample rate
var sumEvents float64
for _, count := range e.movingAverage {
sumEvents += math.Max(1, count)
}
// Store this for burst detection. This is checked in GetSampleRate
// so we need to grab the lock when we update it.
e.lock.Lock()
e.burstThreshold = sumEvents * e.BurstMultiple
e.lock.Unlock()
// Calculate the desired average sample rate per second based on the volume we've received.
// This is the number of events we'd like to let through per adjustment interval.
goalCount := float64(e.GoalThroughputPerSec) * e.AdjustmentInterval.Seconds()
// goalRatio is the goalCount divided by the sum of all the log values - it
// determines what percentage of the total event space belongs to each key
var logSum float64
for _, count := range e.movingAverage {
// We take the max of (1, count) because count * weight is < 1 for
// very small counts, which throws off the logSum and can cause
// incorrect samples rates to be computed when throughput is low
logSum += math.Log10(math.Max(1, count))
}
goalRatio := goalCount / logSum
newSavedSampleRates := calculateSampleRates(goalRatio, e.movingAverage)
e.lock.Lock()
defer e.lock.Unlock()
e.savedSampleRates = newSavedSampleRates
e.haveData = true
e.updating = false
}
// GetSampleRate takes a key and returns the appropriate sample rate for that
// key.
func (e *EMAThroughput) GetSampleRate(key string) int {
return e.GetSampleRateMulti(key, 1)
}
// GetSampleRateMulti takes a key representing count spans and returns the
// appropriate sample rate for that key.
func (e *EMAThroughput) GetSampleRateMulti(key string, count int) int {
e.lock.Lock()
defer e.lock.Unlock()
e.requestCount++
e.eventCount += int64(count)
// Enforce MaxKeys limit on the size of the map
if e.MaxKeys > 0 {
// If a key already exists, increment it. If not, but we're under the limit, store a new key
if _, found := e.currentCounts[key]; found || len(e.currentCounts) < e.MaxKeys {
e.currentCounts[key] += float64(count)
e.currentBurstSum += float64(count)
}
} else {
e.currentCounts[key] += float64(count)
e.currentBurstSum += float64(count)
}
// Enforce the burst threshold
if e.burstThreshold > 0 && e.currentBurstSum >= e.burstThreshold && e.intervalCount >= e.BurstDetectionDelay {
// reset the burst sum to prevent additional burst updates from occurring while updateMaps is running
e.currentBurstSum = 0
e.burstCount++
// send but don't block - consuming is blocked on updateMaps, which takes the same lock we're holding
select {
case e.burstSignal <- struct{}{}:
default:
}
}
if !e.haveData {
return e.InitialSampleRate
}
if rate, found := e.savedSampleRates[key]; found {
return rate
}
return 1
}
func (e *EMAThroughput) updateEMA(newCounts map[string]float64) {
keysToUpdate := make([]string, 0, len(e.movingAverage))
for key := range e.movingAverage {
keysToUpdate = append(keysToUpdate, key)
}
// Update any existing keys with new values
for _, key := range keysToUpdate {
var newAvg float64
// Was this key seen in the last interval? Adjust by that amount
if val, found := newCounts[key]; found {
newAvg = adjustAverage(e.movingAverage[key], val, e.Weight)
} else {
// Otherwise adjust by zero
newAvg = adjustAverage(e.movingAverage[key], 0, e.Weight)
}
// Age out this value if it's too small to care about for calculating sample rates
// This is also necessary to keep our map from going forever.
if newAvg < e.AgeOutValue {
delete(e.movingAverage, key)
} else {
e.movingAverage[key] = newAvg
}
// We've processed this key - don't process it again when we look at new counts
delete(newCounts, key)
}
for key := range newCounts {
newAvg := adjustAverage(0, newCounts[key], e.Weight)
if newAvg >= e.AgeOutValue {
e.movingAverage[key] = newAvg
}
}
}
type emaThroughputState struct {
// These fields are exported for use by `JSON.Marshal` and `JSON.Unmarshal`
SavedSampleRates map[string]int `json:"saved_sample_rates"`
MovingAverage map[string]float64 `json:"moving_average"`
}
// SaveState returns a byte array with a JSON representation of the sampler state
func (e *EMAThroughput) SaveState() ([]byte, error) {
e.lock.Lock()
defer e.lock.Unlock()
if e.savedSampleRates == nil {
return nil, errors.New("saved sample rate map is nil")
}
if e.movingAverage == nil {
return nil, errors.New("moving average map is nil")
}
s := &emaThroughputState{SavedSampleRates: e.savedSampleRates, MovingAverage: e.movingAverage}
return json.Marshal(s)
}
// LoadState accepts a byte array with a JSON representation of a previous instance's
// state
func (e *EMAThroughput) LoadState(state []byte) error {
e.lock.Lock()
defer e.lock.Unlock()
s := emaThroughputState{}
err := json.Unmarshal(state, &s)
if err != nil {
return err
}
// Load the previously calculated sample rates
e.savedSampleRates = s.SavedSampleRates
e.movingAverage = s.MovingAverage
// Allow GetSampleRate to return calculated sample rates from the loaded map
e.haveData = true
return nil
}
func (e *EMAThroughput) GetMetrics(prefix string) map[string]int64 {
e.lock.Lock()
defer e.lock.Unlock()
mets := map[string]int64{
prefix + "request_count": e.requestCount,
prefix + "event_count": e.eventCount,
prefix + "burst_count": e.burstCount,
prefix + "interval_count": int64(e.intervalCount),
prefix + "keyspace_size": int64(len(e.currentCounts)),
}
return mets
}