Skip to content

Commit

Permalink
Add timing/histogram for statsD receiver as OTLP summary (#3261)
Browse files Browse the repository at this point in the history
  • Loading branch information
gavindoudou authored May 3, 2021
1 parent c9cf817 commit af6248b
Show file tree
Hide file tree
Showing 13 changed files with 248 additions and 63 deletions.
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1156,6 +1156,7 @@ github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJ
github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
github.com/modern-go/reflect2 v1.0.1 h1:9f412s+6RmYXLWZSEzVVgPGK7C2PphHj5RJrvfx9AWI=
github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe h1:iruDEfMl2E6fbMZ9s0scYfZQ84/6SPL6zC8ACM2oIL0=
github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe/go.mod h1:wL8QJuTMNUDYhXwkmfOly8iTdp5TEcJFWZD2D7SIkUc=
github.com/moricho/tparallel v0.2.1/go.mod h1:fXEIZxG2vdfl0ZF8b42f5a78EhjjD5mX8qUplsoSU4k=
github.com/morikuni/aec v0.0.0-20170113033406-39771216ff4c/go.mod h1:BbKIizmSmc5MMPqRYbxO4ZU0S0+P200+tUnFx7PXmsc=
Expand Down
20 changes: 7 additions & 13 deletions receiver/statsdreceiver/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,12 @@ The Following settings are optional:

- `timer_histogram_mapping:`(default value is below): Specify what OTLP type to convert received timing/histogram data to.

// TODO: can add regex support for `match` later.

`"match"`, we only support `"*"` now.

`"statsd_type"` specifies received Statsd data type. Possible values for this setting are `"timing"`, `"timer"` and `"histogram"`.

`"observer_type"` specifies OTLP data type to convert to. The only supported target data type currently is `"gauge"`, which does not perform any aggregation.
Support for `"summary"` data type is planned to be added in the future.
`"observer_type"` specifies OTLP data type to convert to. We support `"gauge"` and `"summary"`. For `"gauge"`, it does not perform any aggregation.
For `"summary`, the statsD receiver will aggregate to one OTLP summary metric for one metric description(the same metric name with the same tags). It will send percentile 0, 10, 50, 90, 95, 100 to the downstream.
TODO: Add a new option to use a smoothed summary like Promethetheus: https://github.com/open-telemetry/opentelemetry-collector-contrib/pull/3261

Example:

Expand All @@ -42,11 +40,9 @@ receivers:
aggregation_interval: 70s
enable_metric_type: true
timer_histogram_mapping:
- match: "*"
statsd_type: "histogram"
- statsd_type: "histogram"
observer_type: "gauge"
- match: "*"
statsd_type: "timing"
- statsd_type: "timing"
observer_type: "gauge"
```
Expand Down Expand Up @@ -120,11 +116,9 @@ receivers:
aggregation_interval: 60s # default
enable_metric_type: false # default
timer_histogram_mapping:
- match: "*"
statsd_type: "histogram"
- statsd_type: "histogram"
observer_type: "gauge"
- match: "*"
statsd_type: "timing"
- statsd_type: "timing"
observer_type: "gauge"
exporters:
Expand Down
11 changes: 1 addition & 10 deletions receiver/statsdreceiver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,24 +37,15 @@ type Config struct {
func (c *Config) validate() error {

var errors []error
supportMatch := []string{"*"}
supportedStatsdType := []string{"timing", "timer", "histogram"}
supportedObserverType := []string{"gauge"}
supportedObserverType := []string{"gauge", "summary"}

if c.AggregationInterval <= 0 {
errors = append(errors, fmt.Errorf("aggregation_interval must be a positive duration"))
}

var TimerHistogramMappingMissingObjectName bool
for _, eachMap := range c.TimerHistogramMapping {
if eachMap.Match == "" {
TimerHistogramMappingMissingObjectName = true
break
}

if !protocol.Contains(supportMatch, eachMap.Match) {
errors = append(errors, fmt.Errorf("match is not supported: %s", eachMap.Match))
}

if eachMap.StatsdType == "" {
TimerHistogramMappingMissingObjectName = true
Expand Down
33 changes: 6 additions & 27 deletions receiver/statsdreceiver/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func TestLoadConfig(t *testing.T) {
Transport: "custom_transport",
},
AggregationInterval: 70 * time.Second,
TimerHistogramMapping: []protocol.TimerHistogramMapping{{Match: "*", StatsdType: "histogram", ObserverType: "gauge"}, {Match: "*", StatsdType: "timing", ObserverType: "gauge"}},
TimerHistogramMapping: []protocol.TimerHistogramMapping{{StatsdType: "histogram", ObserverType: "gauge"}, {StatsdType: "timing", ObserverType: "gauge"}},
}, r1)
}

Expand All @@ -73,7 +73,6 @@ func TestValidate(t *testing.T) {
const (
negativeAggregationIntervalErr = "aggregation_interval must be a positive duration"
noObjectNameErr = "must specify object name for all TimerHistogramMappings"
matchNotSupportErr = "match is not supported: %s"
statsdTypeNotSupportErr = "statsd_type is not supported: %s"
observerTypeNotSupportErr = "observer_type is not supported: %s"
)
Expand All @@ -83,28 +82,18 @@ func TestValidate(t *testing.T) {
name: "negativeAggregationInterval",
cfg: &Config{
AggregationInterval: -1,
TimerHistogramMapping: []protocol.TimerHistogramMapping{
{Match: "*", StatsdType: "timing", ObserverType: "gauge"},
},
},
expectedErr: negativeAggregationIntervalErr,
},
{
name: "emptyMatch",
cfg: &Config{
AggregationInterval: 10,
TimerHistogramMapping: []protocol.TimerHistogramMapping{
{StatsdType: "timing", ObserverType: "gauge"},
},
},
expectedErr: noObjectNameErr,
expectedErr: negativeAggregationIntervalErr,
},
{
name: "emptyStatsdType",
cfg: &Config{
AggregationInterval: 10,
TimerHistogramMapping: []protocol.TimerHistogramMapping{
{Match: "*", ObserverType: "gauge"},
{ObserverType: "gauge"},
},
},
expectedErr: noObjectNameErr,
Expand All @@ -114,27 +103,17 @@ func TestValidate(t *testing.T) {
cfg: &Config{
AggregationInterval: 10,
TimerHistogramMapping: []protocol.TimerHistogramMapping{
{Match: "*", StatsdType: "timing"},
{StatsdType: "timing"},
},
},
expectedErr: noObjectNameErr,
},
{
name: "MatchNotSupport",
cfg: &Config{
AggregationInterval: 10,
TimerHistogramMapping: []protocol.TimerHistogramMapping{
{Match: "aaa", StatsdType: "timing", ObserverType: "gauge"},
},
},
expectedErr: fmt.Sprintf(matchNotSupportErr, "aaa"),
},
{
name: "StatsdTypeNotSupport",
cfg: &Config{
AggregationInterval: 10,
TimerHistogramMapping: []protocol.TimerHistogramMapping{
{Match: "*", StatsdType: "abc", ObserverType: "gauge"},
{StatsdType: "abc", ObserverType: "gauge"},
},
},
expectedErr: fmt.Sprintf(statsdTypeNotSupportErr, "abc"),
Expand All @@ -144,7 +123,7 @@ func TestValidate(t *testing.T) {
cfg: &Config{
AggregationInterval: 10,
TimerHistogramMapping: []protocol.TimerHistogramMapping{
{Match: "*", StatsdType: "timer", ObserverType: "gauge1"},
{StatsdType: "timer", ObserverType: "gauge1"},
},
},
expectedErr: fmt.Sprintf(observerTypeNotSupportErr, "gauge1"),
Expand Down
6 changes: 5 additions & 1 deletion receiver/statsdreceiver/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ const (
defaultEnableMetricType = false
)

var (
defaultTimerHistogramMapping = []protocol.TimerHistogramMapping{{StatsdType: "timer", ObserverType: "gauge"}, {StatsdType: "histogram", ObserverType: "gauge"}}
)

// NewFactory creates a factory for the StatsD receiver.
func NewFactory() component.ReceiverFactory {
return receiverhelper.NewFactory(
Expand All @@ -57,7 +61,7 @@ func createDefaultConfig() config.Receiver {
},
AggregationInterval: defaultAggregationInterval,
EnableMetricType: defaultEnableMetricType,
TimerHistogramMapping: []protocol.TimerHistogramMapping{{Match: "*", StatsdType: "timer", ObserverType: "gauge"}, {Match: "*", StatsdType: "histogram", ObserverType: "gauge"}},
TimerHistogramMapping: defaultTimerHistogramMapping,
}
}

Expand Down
2 changes: 1 addition & 1 deletion receiver/statsdreceiver/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func TestCreateReceiverWithConfigErr(t *testing.T) {
cfg := &Config{
AggregationInterval: -1,
TimerHistogramMapping: []protocol.TimerHistogramMapping{
{Match: "*", StatsdType: "timing", ObserverType: "gauge"},
{StatsdType: "timing", ObserverType: "gauge"},
},
}
receiver, err := createMetricsReceiver(
Expand Down
1 change: 1 addition & 0 deletions receiver/statsdreceiver/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ require (
github.com/hashicorp/go-immutable-radix v1.2.0 // indirect
github.com/hashicorp/go-msgpack v0.5.5 // indirect
github.com/mattn/go-colorable v0.1.7 // indirect
github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe
github.com/onsi/ginkgo v1.14.1 // indirect
github.com/onsi/gomega v1.10.2 // indirect
github.com/pelletier/go-toml v1.8.0 // indirect
Expand Down
1 change: 1 addition & 0 deletions receiver/statsdreceiver/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -750,6 +750,7 @@ github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJ
github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
github.com/modern-go/reflect2 v1.0.1 h1:9f412s+6RmYXLWZSEzVVgPGK7C2PphHj5RJrvfx9AWI=
github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe h1:iruDEfMl2E6fbMZ9s0scYfZQ84/6SPL6zC8ACM2oIL0=
github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe/go.mod h1:wL8QJuTMNUDYhXwkmfOly8iTdp5TEcJFWZD2D7SIkUc=
github.com/morikuni/aec v1.0.0 h1:nP9CBfwrvYnBRgY6qfDQkygYDmYwOilePFkwzv4dU8A=
github.com/morikuni/aec v1.0.0/go.mod h1:BbKIizmSmc5MMPqRYbxO4ZU0S0+P200+tUnFx7PXmsc=
Expand Down
29 changes: 29 additions & 0 deletions receiver/statsdreceiver/protocol/metric_translator.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package protocol
import (
"time"

"github.com/montanaflynn/stats"
"go.opentelemetry.io/collector/consumer/pdata"
)

Expand Down Expand Up @@ -58,3 +59,31 @@ func buildGaugeMetric(parsedMetric statsDMetric, timeNow time.Time) pdata.Instru

return ilm
}

func buildSummaryMetric(summaryMetric summaryMetric) pdata.InstrumentationLibraryMetrics {
dp := pdata.NewSummaryDataPoint()
dp.SetCount(uint64(len(summaryMetric.summaryPoints)))
sum, _ := stats.Sum(summaryMetric.summaryPoints)
dp.SetSum(sum)
dp.SetTimestamp(pdata.TimestampFromTime(summaryMetric.timeNow))

quantile := []float64{0, 10, 50, 90, 95, 100}
for _, v := range quantile {
eachQuantile := pdata.NewValueAtQuantile()
eachQuantile.SetQuantile(v)
eachQuantileValue, _ := stats.PercentileNearestRank(summaryMetric.summaryPoints, v)
eachQuantile.SetValue(eachQuantileValue)
dp.QuantileValues().Append(eachQuantile)
}

nm := pdata.NewMetric()
nm.SetName(summaryMetric.name)
nm.SetDataType(pdata.MetricDataTypeSummary)
nm.Summary().DataPoints().Append(dp)

ilm := pdata.NewInstrumentationLibraryMetrics()
ilm.Metrics().Append(nm)

return ilm

}
34 changes: 34 additions & 0 deletions receiver/statsdreceiver/protocol/metric_translator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,3 +74,37 @@ func TestBuildGaugeMetric(t *testing.T) {
dp.LabelsMap().Insert("mykey2", "myvalue2")
assert.Equal(t, metric, expectedMetrics)
}

func TestBuildSummaryMetric(t *testing.T) {
timeNow := time.Now()

oneSummaryMetric := summaryMetric{
name: "testSummary",
summaryPoints: []float64{1, 2, 4, 6, 5, 3},
labelKeys: []string{"mykey", "mykey2"},
labelValues: []string{"myvalue", "myvalue2"},
timeNow: timeNow,
}

metric := buildSummaryMetric(oneSummaryMetric)
expectedMetric := pdata.NewInstrumentationLibraryMetrics()
expectedMetric.Metrics().Resize(1)
expectedMetric.Metrics().At(0).SetName("testSummary")
expectedMetric.Metrics().At(0).SetDataType(pdata.MetricDataTypeSummary)
expectedMetric.Metrics().At(0).Summary().DataPoints().Resize(1)
expectedMetric.Metrics().At(0).Summary().DataPoints().At(0).SetSum(21)
expectedMetric.Metrics().At(0).Summary().DataPoints().At(0).SetCount(6)
expectedMetric.Metrics().At(0).Summary().DataPoints().At(0).SetTimestamp(pdata.TimestampFromTime(timeNow))
quantile := []float64{0, 10, 50, 90, 95, 100}
value := []float64{1, 1, 3, 6, 6, 6}
for int, v := range quantile {
eachQuantile := pdata.NewValueAtQuantile()
eachQuantile.SetQuantile(v)
eachQuantileValue := value[int]
eachQuantile.SetValue(eachQuantileValue)
expectedMetric.Metrics().At(0).Summary().DataPoints().At(0).QuantileValues().Append(eachQuantile)
}

assert.Equal(t, metric, expectedMetric)

}
58 changes: 56 additions & 2 deletions receiver/statsdreceiver/protocol/statsd_parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ const (
)

type TimerHistogramMapping struct {
Match string `mapstructure:"match"`
StatsdType string `mapstructure:"statsd_type"`
ObserverType string `mapstructure:"observer_type"`
}
Expand All @@ -52,12 +51,21 @@ type TimerHistogramMapping struct {
type StatsDParser struct {
gauges map[statsDMetricdescription]pdata.InstrumentationLibraryMetrics
counters map[statsDMetricdescription]pdata.InstrumentationLibraryMetrics
summaries map[statsDMetricdescription]summaryMetric
timersAndDistributions []pdata.InstrumentationLibraryMetrics
enableMetricType bool
observeTimer string
observeHistogram string
}

type summaryMetric struct {
name string
summaryPoints []float64
labelKeys []string
labelValues []string
timeNow time.Time
}

type statsDMetric struct {
description statsDMetricdescription
value string
Expand All @@ -80,6 +88,8 @@ func (p *StatsDParser) Initialize(enableMetricType bool, sendTimerHistogram []Ti
p.gauges = make(map[statsDMetricdescription]pdata.InstrumentationLibraryMetrics)
p.counters = make(map[statsDMetricdescription]pdata.InstrumentationLibraryMetrics)
p.timersAndDistributions = make([]pdata.InstrumentationLibraryMetrics, 0)
p.summaries = make(map[statsDMetricdescription]summaryMetric)

p.enableMetricType = enableMetricType
for _, eachMap := range sendTimerHistogram {
switch eachMap.StatsdType {
Expand Down Expand Up @@ -109,10 +119,14 @@ func (p *StatsDParser) GetMetrics() pdata.Metrics {
rm.InstrumentationLibraryMetrics().Append(metric)
}

for _, summaryMetric := range p.summaries {
metrics.ResourceMetrics().At(0).InstrumentationLibraryMetrics().Append(buildSummaryMetric(summaryMetric))
}

p.gauges = make(map[statsDMetricdescription]pdata.InstrumentationLibraryMetrics)
p.counters = make(map[statsDMetricdescription]pdata.InstrumentationLibraryMetrics)
p.timersAndDistributions = make([]pdata.InstrumentationLibraryMetrics, 0)

p.summaries = make(map[statsDMetricdescription]summaryMetric)
return metrics
}

Expand Down Expand Up @@ -155,12 +169,52 @@ func (p *StatsDParser) Aggregate(line string) error {
switch p.observeHistogram {
case "gauge":
p.timersAndDistributions = append(p.timersAndDistributions, buildGaugeMetric(parsedMetric, timeNowFunc()))
case "summary":
eachSummaryMetric, ok := p.summaries[parsedMetric.description]
if !ok {
p.summaries[parsedMetric.description] = summaryMetric{
name: parsedMetric.description.name,
summaryPoints: []float64{parsedMetric.floatvalue},
labelKeys: parsedMetric.labelKeys,
labelValues: parsedMetric.labelValues,
timeNow: timeNowFunc(),
}
} else {
points := eachSummaryMetric.summaryPoints
p.summaries[parsedMetric.description] = summaryMetric{
name: parsedMetric.description.name,
summaryPoints: append(points, parsedMetric.floatvalue),
labelKeys: parsedMetric.labelKeys,
labelValues: parsedMetric.labelValues,
timeNow: timeNowFunc(),
}
}
}

case statsdTiming:
switch p.observeTimer {
case "gauge":
p.timersAndDistributions = append(p.timersAndDistributions, buildGaugeMetric(parsedMetric, timeNowFunc()))
case "summary":
eachSummaryMetric, ok := p.summaries[parsedMetric.description]
if !ok {
p.summaries[parsedMetric.description] = summaryMetric{
name: parsedMetric.description.name,
summaryPoints: []float64{parsedMetric.floatvalue},
labelKeys: parsedMetric.labelKeys,
labelValues: parsedMetric.labelValues,
timeNow: timeNowFunc(),
}
} else {
points := eachSummaryMetric.summaryPoints
p.summaries[parsedMetric.description] = summaryMetric{
name: parsedMetric.description.name,
summaryPoints: append(points, parsedMetric.floatvalue),
labelKeys: parsedMetric.labelKeys,
labelValues: parsedMetric.labelValues,
timeNow: timeNowFunc(),
}
}
}
}

Expand Down
Loading

0 comments on commit af6248b

Please sign in to comment.