Skip to content

Commit

Permalink
aggregation/txmetrics: use RepresentativeCount
Browse files Browse the repository at this point in the history
  • Loading branch information
axw committed Jun 25, 2020
1 parent 4d381a3 commit f266588
Show file tree
Hide file tree
Showing 2 changed files with 96 additions and 9 deletions.
39 changes: 30 additions & 9 deletions x-pack/apm-server/aggregation/txmetrics/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package txmetrics
import (
"context"
"fmt"
"math"
"strings"
"sync"
"time"
Expand All @@ -26,6 +27,16 @@ import (
const (
minDuration time.Duration = 0
maxDuration time.Duration = time.Hour

// We scale transaction counts in the the histogram, which only permits
// storing integer counts, to allow for fractional transactions due to
// sampling.
//
// e.g. if the sampling rate is 0.4, then each sampled transaction has a
// representative count of 2.5 (1/0.4). If we receive two such transactions
// we will record a count of 5000 (2 * 2.5 * histogramCountScale). When we
// publish metrics, we will scale down to 5 (5000 / histogramCountScale).
histogramCountScale = 1000
)

// Aggregator aggregates transaction durations, periodically publishing histogram metrics.
Expand Down Expand Up @@ -192,21 +203,22 @@ func (a *Aggregator) AggregateTransformables(in []transform.Transformable) []tra
func (a *Aggregator) AggregateTransaction(tx *model.Transaction) *model.Metricset {
key := a.makeTransactionAggregationKey(tx)
hash := key.hash()
count := transactionCount(tx)
duration := time.Duration(tx.Duration * float64(time.Millisecond))
if a.updateTransactionMetrics(key, hash, duration) {
if a.updateTransactionMetrics(key, hash, count, duration) {
return nil
}
// Too many aggregation keys: could not update metrics, so immediately
// publish a single-value metric document.
//
// TODO(axw) log a warning with a rate-limit, increment a counter.
counts := []int64{1}
counts := []int64{int64(math.Round(count))}
values := []float64{float64(durationMicros(duration))}
metricset := makeMetricset(key, hash, time.Now(), counts, values)
return &metricset
}

func (a *Aggregator) updateTransactionMetrics(key transactionAggregationKey, hash uint64, duration time.Duration) bool {
func (a *Aggregator) updateTransactionMetrics(key transactionAggregationKey, hash uint64, count float64, duration time.Duration) bool {
if duration < minDuration {
duration = minDuration
} else if duration > maxDuration {
Expand All @@ -224,7 +236,7 @@ func (a *Aggregator) updateTransactionMetrics(key transactionAggregationKey, has
if ok {
for offset = range entries {
if entries[offset].transactionAggregationKey == key {
entries[offset].recordDuration(duration)
entries[offset].recordDuration(duration, count)
return true
}
}
Expand All @@ -237,7 +249,7 @@ func (a *Aggregator) updateTransactionMetrics(key transactionAggregationKey, has
for i := range entries[offset:] {
if entries[offset+i].transactionAggregationKey == key {
m.mu.Unlock()
entries[offset+i].recordDuration(duration)
entries[offset+i].recordDuration(duration, count)
return true
}
}
Expand All @@ -256,7 +268,7 @@ func (a *Aggregator) updateTransactionMetrics(key transactionAggregationKey, has
} else {
entry.transactionMetrics.histogram.Reset()
}
entry.recordDuration(duration)
entry.recordDuration(duration, count)
m.m[hash] = append(entries, entry)
m.entries++
m.mu.Unlock()
Expand Down Expand Up @@ -401,8 +413,9 @@ type transactionMetrics struct {
histogram *hdrhistogram.Histogram
}

func (m *transactionMetrics) recordDuration(d time.Duration) {
m.histogram.RecordValueAtomic(durationMicros(d))
func (m *transactionMetrics) recordDuration(d time.Duration, n float64) {
count := int64(math.Round(n * histogramCountScale))
m.histogram.RecordValuesAtomic(durationMicros(d), count)
}

func (m *transactionMetrics) histogramBuckets() (counts []int64, values []float64) {
Expand All @@ -418,12 +431,20 @@ func (m *transactionMetrics) histogramBuckets() (counts []int64, values []float6
if b.Count <= 0 {
continue
}
counts = append(counts, b.Count)
count := math.Round(float64(b.Count) / histogramCountScale)
counts = append(counts, int64(count))
values = append(values, float64(b.To))
}
return counts, values
}

func transactionCount(tx *model.Transaction) float64 {
if tx.RepresentativeCount > 0 {
return tx.RepresentativeCount
}
return 1
}

func durationMicros(d time.Duration) int64 {
return int64(d / time.Microsecond)
}
66 changes: 66 additions & 0 deletions x-pack/apm-server/aggregation/txmetrics/aggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,71 @@ func TestAggregatorRunPublishErrors(t *testing.T) {
}
}

func TestAggregateRepresentativeCount(t *testing.T) {
reqs := make(chan publish.PendingReq, 1)

agg, err := txmetrics.NewAggregator(txmetrics.AggregatorConfig{
Report: makeChanReporter(reqs),
MaxTransactionGroups: 1,
MetricsInterval: time.Microsecond,
HDRHistogramSignificantFigures: 1,
RUMUserAgentLRUSize: 1,
})
require.NoError(t, err)

// Record a transaction group so subsequent calls yield immediate metricsets,
// and to demonstrate that fractional transaction counts are accumulated.
agg.AggregateTransaction(&model.Transaction{Name: "fnord", RepresentativeCount: 1})
agg.AggregateTransaction(&model.Transaction{Name: "fnord", RepresentativeCount: 1.5})

for _, test := range []struct {
representativeCount float64
expectedCount int64
}{{
representativeCount: 0,
expectedCount: 1,
}, {
representativeCount: -1,
expectedCount: 1,
}, {
representativeCount: 2,
expectedCount: 2,
}, {
representativeCount: 1.50, // round half away from zero
expectedCount: 2,
}} {
m := agg.AggregateTransaction(&model.Transaction{
Name: "foo",
RepresentativeCount: test.representativeCount,
})
require.NotNil(t, m)

m.Timestamp = time.Time{}
assert.Equal(t, &model.Metricset{
Metadata: model.Metadata{},
TimeseriesInstanceID: ":foo:1db641f187113b17",
Transaction: model.MetricsetTransaction{
Name: "foo",
Root: true,
},
Samples: []model.Sample{{
Name: "transaction.duration.histogram",
Counts: []int64{test.expectedCount},
Values: []float64{0},
}},
}, m)
}

stopAggregator := runAggregator(agg)
defer stopAggregator()

req := expectPublish(t, reqs)
require.Len(t, req.Transformables, 1)
metricset := req.Transformables[0].(*model.Metricset)
require.Len(t, metricset.Samples, 1)
assert.Equal(t, []int64{3 /*round(1+1.5)*/}, metricset.Samples[0].Counts)
}

func TestHDRHistogramSignificantFigures(t *testing.T) {
testHDRHistogramSignificantFigures(t, 1)
testHDRHistogramSignificantFigures(t, 2)
Expand Down Expand Up @@ -346,6 +411,7 @@ func makeChanReporter(ch chan<- publish.PendingReq) publish.Reporter {
}

func expectPublish(t *testing.T, ch <-chan publish.PendingReq) publish.PendingReq {
t.Helper()
select {
case req := <-ch:
return req
Expand Down

0 comments on commit f266588

Please sign in to comment.