Skip to content

Commit

Permalink
feat: Add metrics counter retrieval (#65)
Browse files Browse the repository at this point in the history
## Which problem is this PR solving?

To date, the samplers have had no way of getting any metrics from them,
which has sometimes made it hard to support. This adds a simple
GetMetrics call that retrieves a map of named metrics on request.

GetMetrics returns a map of metrics about the sampler's performance. All
values are returned as int64; counters are cumulative and the names
always end with "_count", while gauges are instantaneous with no
particular naming convention. All names are prefixed with the string
passed in.

## Short description of the changes

- Add GetMetrics function to Sampler interface and all samplers
  • Loading branch information
kentquirk authored Jun 6, 2023
1 parent 3a83381 commit edb5cae
Show file tree
Hide file tree
Showing 10 changed files with 188 additions and 3 deletions.
18 changes: 18 additions & 0 deletions avgsamplerate.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ type AvgSampleRate struct {
done chan struct{}

lock sync.Mutex

// metrics
requestCount int64
eventCount int64
}

// Ensure we implement the sampler interface
Expand Down Expand Up @@ -151,6 +155,9 @@ func (a *AvgSampleRate) GetSampleRateMulti(key string, count int) int {
a.lock.Lock()
defer a.lock.Unlock()

a.requestCount++
a.eventCount += int64(count)

// Enforce MaxKeys limit on the size of the map
if a.MaxKeys > 0 {
// If a key already exists, increment it. If not, but we're under the limit, store a new key
Expand Down Expand Up @@ -205,3 +212,14 @@ func (a *AvgSampleRate) LoadState(state []byte) error {

return nil
}

func (a *AvgSampleRate) GetMetrics(prefix string) map[string]int64 {
a.lock.Lock()
defer a.lock.Unlock()
mets := map[string]int64{
prefix + "request_count": a.requestCount,
prefix + "event_count": a.eventCount,
prefix + "keyspace_size": int64(len(a.currentCounts)),
}
return mets
}
18 changes: 18 additions & 0 deletions avgsamplewithmin.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ type AvgSampleWithMin struct {
done chan struct{}

lock sync.Mutex

// metrics
requestCount int64
eventCount int64
}

// Ensure we implement the sampler interface
Expand Down Expand Up @@ -168,6 +172,9 @@ func (a *AvgSampleWithMin) GetSampleRateMulti(key string, count int) int {
a.lock.Lock()
defer a.lock.Unlock()

a.requestCount++
a.eventCount += int64(count)

// Enforce MaxKeys limit on the size of the map
if a.MaxKeys > 0 {
// If a key already exists, increment it. If not, but we're under the limit, store a new key
Expand Down Expand Up @@ -195,3 +202,14 @@ func (a *AvgSampleWithMin) SaveState() ([]byte, error) {
func (a *AvgSampleWithMin) LoadState(state []byte) error {
return nil
}

func (a *AvgSampleWithMin) GetMetrics(prefix string) map[string]int64 {
a.lock.Lock()
defer a.lock.Unlock()
mets := map[string]int64{
prefix + "request_count": a.requestCount,
prefix + "event_count": a.eventCount,
prefix + "keyspace_size": int64(len(a.currentCounts)),
}
return mets
}
6 changes: 6 additions & 0 deletions dynsampler.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,10 @@ type Sampler interface {
// LoadState accepts a byte array containing the serialized, previous state of the sampler
// implementation. It should be called before `Start`.
LoadState([]byte) error

// GetMetrics returns a map of metrics about the sampler's performance.
// All values are returned as int64; counters are cumulative and the names
// always end with "_count", while gauges are instantaneous with no particular naming convention.
// All names are prefixed with the given string.
GetMetrics(prefix string) map[string]int64
}
22 changes: 22 additions & 0 deletions emasamplerate.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,11 @@ type EMASampleRate struct {

// used only in tests
testSignalMapsDone chan struct{}

// metrics
requestCount int64
eventCount int64
burstCount int64
}

// Ensure we implement the sampler interface
Expand Down Expand Up @@ -238,6 +243,9 @@ func (e *EMASampleRate) GetSampleRateMulti(key string, count int) int {
e.lock.Lock()
defer e.lock.Unlock()

e.requestCount++
e.eventCount += int64(count)

// Enforce MaxKeys limit on the size of the map
if e.MaxKeys > 0 {
// If a key already exists, increment it. If not, but we're under the limit, store a new key
Expand All @@ -254,6 +262,7 @@ func (e *EMASampleRate) GetSampleRateMulti(key string, count int) int {
if e.burstThreshold > 0 && e.currentBurstSum >= e.burstThreshold && e.intervalCount >= e.BurstDetectionDelay {
// reset the burst sum to prevent additional burst updates from occurring while updateMaps is running
e.currentBurstSum = 0
e.burstCount++
// send but don't block - consuming is blocked on updateMaps, which takes the same lock we're holding
select {
case e.burstSignal <- struct{}{}:
Expand Down Expand Up @@ -348,6 +357,19 @@ func (e *EMASampleRate) LoadState(state []byte) error {
return nil
}

func (e *EMASampleRate) GetMetrics(prefix string) map[string]int64 {
e.lock.Lock()
defer e.lock.Unlock()
mets := map[string]int64{
prefix + "request_count": e.requestCount,
prefix + "event_count": e.eventCount,
prefix + "burst_count": e.burstCount,
prefix + "interval_count": int64(e.intervalCount),
prefix + "keyspace_size": int64(len(e.currentCounts)),
}
return mets
}

func adjustAverage(oldAvg, value float64, alpha float64) float64 {
adjustedNewVal := value * alpha
adjustedOldAvg := (1.0 - alpha) * oldAvg
Expand Down
22 changes: 22 additions & 0 deletions emathroughput.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,11 @@ type EMAThroughput struct {

// used only in tests
testSignalMapsDone chan struct{}

// metrics
requestCount int64
eventCount int64
burstCount int64
}

// Ensure we implement the sampler interface
Expand Down Expand Up @@ -249,6 +254,9 @@ func (e *EMAThroughput) GetSampleRateMulti(key string, count int) int {
e.lock.Lock()
defer e.lock.Unlock()

e.requestCount++
e.eventCount += int64(count)

// Enforce MaxKeys limit on the size of the map
if e.MaxKeys > 0 {
// If a key already exists, increment it. If not, but we're under the limit, store a new key
Expand All @@ -265,6 +273,7 @@ func (e *EMAThroughput) GetSampleRateMulti(key string, count int) int {
if e.burstThreshold > 0 && e.currentBurstSum >= e.burstThreshold && e.intervalCount >= e.BurstDetectionDelay {
// reset the burst sum to prevent additional burst updates from occurring while updateMaps is running
e.currentBurstSum = 0
e.burstCount++
// send but don't block - consuming is blocked on updateMaps, which takes the same lock we're holding
select {
case e.burstSignal <- struct{}{}:
Expand Down Expand Up @@ -358,3 +367,16 @@ func (e *EMAThroughput) LoadState(state []byte) error {

return nil
}

func (e *EMAThroughput) GetMetrics(prefix string) map[string]int64 {
e.lock.Lock()
defer e.lock.Unlock()
mets := map[string]int64{
prefix + "request_count": e.requestCount,
prefix + "event_count": e.eventCount,
prefix + "burst_count": e.burstCount,
prefix + "interval_count": int64(e.intervalCount),
prefix + "keyspace_size": int64(len(e.currentCounts)),
}
return mets
}
18 changes: 18 additions & 0 deletions onlyonce.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ type OnlyOnce struct {
seen map[string]bool
done chan struct{}

// metrics
requestCount int64
eventCount int64

lock sync.Mutex
}

Expand Down Expand Up @@ -99,6 +103,9 @@ func (o *OnlyOnce) GetSampleRate(key string) int {
func (o *OnlyOnce) GetSampleRateMulti(key string, count int) int {
o.lock.Lock()
defer o.lock.Unlock()
o.requestCount++
o.eventCount += int64(count)

if _, found := o.seen[key]; found {
return 1000000000
}
Expand All @@ -115,3 +122,14 @@ func (o *OnlyOnce) SaveState() ([]byte, error) {
func (o *OnlyOnce) LoadState(state []byte) error {
return nil
}

func (o *OnlyOnce) GetMetrics(prefix string) map[string]int64 {
o.lock.Lock()
defer o.lock.Unlock()
mets := map[string]int64{
prefix + "request_count": o.requestCount,
prefix + "event_count": o.eventCount,
prefix + "keyspace_size": int64(len(o.seen)),
}
return mets
}
19 changes: 19 additions & 0 deletions perkeythroughput.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ type PerKeyThroughput struct {
done chan struct{}

lock sync.Mutex

// metrics
requestCount int64
eventCount int64
}

// Ensure we implement the sampler interface
Expand Down Expand Up @@ -129,6 +133,10 @@ func (p *PerKeyThroughput) GetSampleRate(key string) int {
func (p *PerKeyThroughput) GetSampleRateMulti(key string, count int) int {
p.lock.Lock()
defer p.lock.Unlock()

p.requestCount++
p.eventCount += int64(count)

// Enforce MaxKeys limit on the size of the map
if p.MaxKeys > 0 {
// If a key already exists, add the count. If not, but we're under the limit, store a new key
Expand All @@ -153,3 +161,14 @@ func (p *PerKeyThroughput) SaveState() ([]byte, error) {
func (p *PerKeyThroughput) LoadState(state []byte) error {
return nil
}

func (p *PerKeyThroughput) GetMetrics(prefix string) map[string]int64 {
p.lock.Lock()
defer p.lock.Unlock()
mets := map[string]int64{
prefix + "request_count": p.requestCount,
prefix + "event_count": p.eventCount,
prefix + "keyspace_size": int64(len(p.currentCounts)),
}
return mets
}
24 changes: 24 additions & 0 deletions static.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package dynsampler

import "sync"

// Static implements Sampler with a static mapping for sample rates. This is
// useful if you have a known set of keys that you want to sample at specific
// rates and apply a default to everything else.
Expand All @@ -8,6 +10,12 @@ type Static struct {
Rates map[string]int
// Default is the value to use if the key is not whitelisted in Rates
Default int

lock sync.Mutex

// metrics
requestCount int64
eventCount int64
}

// Ensure we implement the sampler interface
Expand All @@ -34,6 +42,11 @@ func (s *Static) GetSampleRate(key string) int {
// GetSampleRateMulti takes a key representing count spans and returns the
// appropriate sample rate for that key.
func (s *Static) GetSampleRateMulti(key string, count int) int {
s.lock.Lock()
defer s.lock.Unlock()

s.requestCount++
s.eventCount += int64(count)
if rate, found := s.Rates[key]; found {
return rate
}
Expand All @@ -49,3 +62,14 @@ func (s *Static) SaveState() ([]byte, error) {
func (s *Static) LoadState(state []byte) error {
return nil
}

func (s *Static) GetMetrics(prefix string) map[string]int64 {
s.lock.Lock()
defer s.lock.Unlock()
mets := map[string]int64{
prefix + "request_count": s.requestCount,
prefix + "event_count": s.eventCount,
prefix + "keyspace_size": int64(len(s.Rates)),
}
return mets
}
19 changes: 19 additions & 0 deletions totalthroughput.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ type TotalThroughput struct {
done chan struct{}

lock sync.Mutex

// metrics
requestCount int64
eventCount int64
}

// Ensure we implement the sampler interface
Expand Down Expand Up @@ -138,6 +142,10 @@ func (t *TotalThroughput) GetSampleRate(key string) int {
func (t *TotalThroughput) GetSampleRateMulti(key string, count int) int {
t.lock.Lock()
defer t.lock.Unlock()

t.requestCount++
t.eventCount += int64(count)

// Enforce MaxKeys limit on the size of the map
if t.MaxKeys > 0 {
// If a key already exists, increment it. If not, but we're under the limit, store a new key
Expand All @@ -162,3 +170,14 @@ func (t *TotalThroughput) SaveState() ([]byte, error) {
func (t *TotalThroughput) LoadState(state []byte) error {
return nil
}

func (t *TotalThroughput) GetMetrics(prefix string) map[string]int64 {
t.lock.Lock()
defer t.lock.Unlock()
mets := map[string]int64{
prefix + "request_count": t.requestCount,
prefix + "event_count": t.eventCount,
prefix + "keyspace_size": int64(len(t.currentCounts)),
}
return mets
}
25 changes: 22 additions & 3 deletions windowedthroughput.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,11 @@ type WindowedThroughput struct {
indexGenerator IndexGenerator

lock sync.Mutex

// metrics
requestCount int64
eventCount int64
numKeys int
}

// Ensure we implement the sampler interface
Expand Down Expand Up @@ -146,8 +151,8 @@ func (t *WindowedThroughput) updateMaps() {

// Apply the same aggregation algorithm as total throughput
// Short circuit if no traffic
numKeys := len(aggregateCounts)
if numKeys == 0 {
t.numKeys = len(aggregateCounts)
if t.numKeys == 0 {
// no traffic during the last period.
t.lock.Lock()
defer t.lock.Unlock()
Expand All @@ -157,7 +162,7 @@ func (t *WindowedThroughput) updateMaps() {
// figure out our target throughput per key over the lookback window.
totalGoalThroughput := t.GoalThroughputPerSec * t.LookbackFrequencyDuration.Seconds()
// floor the throughput but min should be 1 event per bucket per time period
throughputPerKey := math.Max(1, float64(totalGoalThroughput)/float64(numKeys))
throughputPerKey := math.Max(1, float64(totalGoalThroughput)/float64(t.numKeys))
// for each key, calculate sample rate by dividing counted events by the
// desired number of events
newSavedSampleRates := make(map[string]int)
Expand All @@ -180,6 +185,9 @@ func (t *WindowedThroughput) GetSampleRate(key string) int {
// GetSampleRateMulti takes a key representing count spans and returns the
// appropriate sample rate for that key.
func (t *WindowedThroughput) GetSampleRateMulti(key string, count int) int {
t.requestCount++
t.eventCount += int64(count)

// Insert the new key into the map.
current := t.indexGenerator.GetCurrentIndex()
err := t.countList.IncrementKey(key, current, count)
Expand All @@ -206,3 +214,14 @@ func (t *WindowedThroughput) SaveState() ([]byte, error) {
func (t *WindowedThroughput) LoadState(state []byte) error {
return nil
}

func (t *WindowedThroughput) GetMetrics(prefix string) map[string]int64 {
t.lock.Lock()
defer t.lock.Unlock()
mets := map[string]int64{
prefix + "request_count": t.requestCount,
prefix + "event_count": t.eventCount,
prefix + "keyspace_size": int64(t.numKeys),
}
return mets
}

0 comments on commit edb5cae

Please sign in to comment.