Skip to content

Commit

Permalink
Add timing/histogram for statsD receiver as OTLP gauge (open-telemetr…
Browse files Browse the repository at this point in the history
  • Loading branch information
gavindoudou authored and pmatyjasek-sumo committed Apr 28, 2021
1 parent d60e768 commit 3330d4f
Show file tree
Hide file tree
Showing 12 changed files with 468 additions and 83 deletions.
33 changes: 30 additions & 3 deletions receiver/statsdreceiver/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,17 @@ The Following settings are optional:

- `enable_metric_type: true`(default value is false): Enable the statsd receiver to be able to emit the metric type(gauge, counter, timer(in the future), histogram(in the future)) as a label.

- `timer_histogram_mapping:`(default value is below): Specify what OTLP type to convert received timing/histogram data to.

// TODO: can add regex support for `match` later.

`"match"`, we only support `"*"` now.

`"statsd_type"` specifies received Statsd data type. Possible values for this setting are `"timing"`, `"timer"` and `"histogram"`.

`"observer_type"` specifies OTLP data type to convert to. The only supported target data type currently is `"gauge"`, which does not perform any aggregation.
Support for `"summary"` data type is planned to be added in the future.

Example:

```yaml
Expand All @@ -30,6 +41,13 @@ receivers:
endpoint: "localhost:8127"
aggregation_interval: 70s
enable_metric_type: true
timer_histogram_mapping:
- match: "*"
statsd_type: "histogram"
observer_type: "gauge"
- match: "*"
statsd_type: "timing"
observer_type: "gauge"
```
The full list of settings exposed for this receiver are documented [here](./config.go)
Expand Down Expand Up @@ -75,19 +93,21 @@ General format is:
`<name>:<value>|c|@<sample-rate>|#<tag1-key>:<tag1-value>`

It supports sample rate.
TODO: Use OTLP type(Sum data points, with AggregationTemporality=Delta and Monotonic=False) for transferred data types (now we are using OpenCensus types).
TODO: Need to change the implementation part for sample rate after OTLP supports sample rate as a parameter later.


### Gauge

`<name>:<value>|g|@<sample-rate>|#<tag1-key>:<tag1-value>`

TODO: Use OTLP type for transferred data types (now we are using OpenCensus types).

### Timer

TODO: add support for timer and histogram.
`<name>:<value>|ms|@<sample-rate>|#<tag1-key>:<tag1-value>`
`<name>:<value>|h|@<sample-rate>|#<tag1-key>:<tag1-value>`

It supports sample rate.


## Testing

Expand All @@ -99,6 +119,13 @@ receivers:
endpoint: "localhost:8125" # default
aggregation_interval: 60s # default
enable_metric_type: false # default
timer_histogram_mapping:
- match: "*"
statsd_type: "histogram"
observer_type: "gauge"
- match: "*"
statsd_type: "timing"
observer_type: "gauge"
exporters:
file:
Expand Down
59 changes: 56 additions & 3 deletions receiver/statsdreceiver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,69 @@
package statsdreceiver

import (
"fmt"
"time"

"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/config/confignet"
"go.opentelemetry.io/collector/consumer/consumererror"

"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/statsdreceiver/protocol"
)

// Config defines configuration for StatsD receiver.
type Config struct {
config.ReceiverSettings `mapstructure:",squash"`
NetAddr confignet.NetAddr `mapstructure:",squash"`
AggregationInterval time.Duration `mapstructure:"aggregation_interval"`
EnableMetricType bool `mapstructure:"enable_metric_type"`
NetAddr confignet.NetAddr `mapstructure:",squash"`
AggregationInterval time.Duration `mapstructure:"aggregation_interval"`
EnableMetricType bool `mapstructure:"enable_metric_type"`
TimerHistogramMapping []protocol.TimerHistogramMapping `mapstructure:"timer_histogram_mapping"`
}

func (c *Config) validate() error {

var errors []error
supportMatch := []string{"*"}
supportedStatsdType := []string{"timing", "timer", "histogram"}
supportedObserverType := []string{"gauge"}

if c.AggregationInterval <= 0 {
errors = append(errors, fmt.Errorf("aggregation_interval must be a positive duration"))
}

var TimerHistogramMappingMissingObjectName bool
for _, eachMap := range c.TimerHistogramMapping {
if eachMap.Match == "" {
TimerHistogramMappingMissingObjectName = true
break
}

if !protocol.Contains(supportMatch, eachMap.Match) {
errors = append(errors, fmt.Errorf("match is not supported: %s", eachMap.Match))
}

if eachMap.StatsdType == "" {
TimerHistogramMappingMissingObjectName = true
break
}

if !protocol.Contains(supportedStatsdType, eachMap.StatsdType) {
errors = append(errors, fmt.Errorf("statsd_type is not supported: %s", eachMap.StatsdType))
}

if eachMap.ObserverType == "" {
TimerHistogramMappingMissingObjectName = true
break
}

if !protocol.Contains(supportedObserverType, eachMap.ObserverType) {
errors = append(errors, fmt.Errorf("observer_type is not supported: %s", eachMap.ObserverType))
}
}

if TimerHistogramMappingMissingObjectName {
errors = append(errors, fmt.Errorf("must specify object name for all TimerHistogramMappings"))
}

return consumererror.Combine(errors)
}
102 changes: 101 additions & 1 deletion receiver/statsdreceiver/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package statsdreceiver

import (
"fmt"
"path"
"testing"
"time"
Expand All @@ -25,6 +26,8 @@ import (
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/config/confignet"
"go.opentelemetry.io/collector/config/configtest"

"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/statsdreceiver/protocol"
)

func TestLoadConfig(t *testing.T) {
Expand Down Expand Up @@ -55,6 +58,103 @@ func TestLoadConfig(t *testing.T) {
Endpoint: "localhost:12345",
Transport: "custom_transport",
},
AggregationInterval: 70 * time.Second,
AggregationInterval: 70 * time.Second,
TimerHistogramMapping: []protocol.TimerHistogramMapping{{Match: "*", StatsdType: "histogram", ObserverType: "gauge"}, {Match: "*", StatsdType: "timing", ObserverType: "gauge"}},
}, r1)
}

func TestValidate(t *testing.T) {
type test struct {
name string
cfg *Config
expectedErr string
}

const (
negativeAggregationIntervalErr = "aggregation_interval must be a positive duration"
noObjectNameErr = "must specify object name for all TimerHistogramMappings"
matchNotSupportErr = "match is not supported: %s"
statsdTypeNotSupportErr = "statsd_type is not supported: %s"
observerTypeNotSupportErr = "observer_type is not supported: %s"
)

tests := []test{
{
name: "negativeAggregationInterval",
cfg: &Config{
AggregationInterval: -1,
TimerHistogramMapping: []protocol.TimerHistogramMapping{
{Match: "*", StatsdType: "timing", ObserverType: "gauge"},
},
},
expectedErr: negativeAggregationIntervalErr,
},
{
name: "emptyMatch",
cfg: &Config{
AggregationInterval: 10,
TimerHistogramMapping: []protocol.TimerHistogramMapping{
{StatsdType: "timing", ObserverType: "gauge"},
},
},
expectedErr: noObjectNameErr,
},
{
name: "emptyStatsdType",
cfg: &Config{
AggregationInterval: 10,
TimerHistogramMapping: []protocol.TimerHistogramMapping{
{Match: "*", ObserverType: "gauge"},
},
},
expectedErr: noObjectNameErr,
},
{
name: "emptyObserverType",
cfg: &Config{
AggregationInterval: 10,
TimerHistogramMapping: []protocol.TimerHistogramMapping{
{Match: "*", StatsdType: "timing"},
},
},
expectedErr: noObjectNameErr,
},
{
name: "MatchNotSupport",
cfg: &Config{
AggregationInterval: 10,
TimerHistogramMapping: []protocol.TimerHistogramMapping{
{Match: "aaa", StatsdType: "timing", ObserverType: "gauge"},
},
},
expectedErr: fmt.Sprintf(matchNotSupportErr, "aaa"),
},
{
name: "StatsdTypeNotSupport",
cfg: &Config{
AggregationInterval: 10,
TimerHistogramMapping: []protocol.TimerHistogramMapping{
{Match: "*", StatsdType: "abc", ObserverType: "gauge"},
},
},
expectedErr: fmt.Sprintf(statsdTypeNotSupportErr, "abc"),
},
{
name: "ObserverTypeNotSupport",
cfg: &Config{
AggregationInterval: 10,
TimerHistogramMapping: []protocol.TimerHistogramMapping{
{Match: "*", StatsdType: "timer", ObserverType: "gauge1"},
},
},
expectedErr: fmt.Sprintf(observerTypeNotSupportErr, "gauge1"),
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
require.EqualError(t, test.cfg.validate(), test.expectedErr)
})
}

}
11 changes: 9 additions & 2 deletions receiver/statsdreceiver/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import (
"go.opentelemetry.io/collector/config/confignet"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/receiver/receiverhelper"

"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/statsdreceiver/protocol"
)

const (
Expand Down Expand Up @@ -53,8 +55,9 @@ func createDefaultConfig() config.Receiver {
Endpoint: defaultBindEndpoint,
Transport: defaultTransport,
},
AggregationInterval: defaultAggregationInterval,
EnableMetricType: defaultEnableMetricType,
AggregationInterval: defaultAggregationInterval,
EnableMetricType: defaultEnableMetricType,
TimerHistogramMapping: []protocol.TimerHistogramMapping{{Match: "*", StatsdType: "timer", ObserverType: "gauge"}, {Match: "*", StatsdType: "histogram", ObserverType: "gauge"}},
}
}

Expand All @@ -65,5 +68,9 @@ func createMetricsReceiver(
consumer consumer.Metrics,
) (component.MetricsReceiver, error) {
c := cfg.(*Config)
err := c.validate()
if err != nil {
return nil, err
}
return New(params.Logger, *c, consumer)
}
20 changes: 20 additions & 0 deletions receiver/statsdreceiver/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import (
"go.opentelemetry.io/collector/config/configcheck"
"go.opentelemetry.io/collector/consumer/consumertest"
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/statsdreceiver/protocol"
)

func TestCreateDefaultConfig(t *testing.T) {
Expand All @@ -42,6 +44,24 @@ func TestCreateReceiver(t *testing.T) {
assert.NotNil(t, tReceiver, "receiver creation failed")
}

func TestCreateReceiverWithConfigErr(t *testing.T) {
cfg := &Config{
AggregationInterval: -1,
TimerHistogramMapping: []protocol.TimerHistogramMapping{
{Match: "*", StatsdType: "timing", ObserverType: "gauge"},
},
}
receiver, err := createMetricsReceiver(
context.Background(),
component.ReceiverCreateParams{Logger: zap.NewNop()},
cfg,
consumertest.NewNop(),
)
assert.Error(t, err, "aggregation_interval must be a positive duration")
assert.Nil(t, receiver)

}

func TestCreateMetricsReceiverWithNilConsumer(t *testing.T) {
receiver, err := createMetricsReceiver(
context.Background(),
Expand Down
2 changes: 1 addition & 1 deletion receiver/statsdreceiver/protocol/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (

// Parser is something that can map input StatsD strings to OTLP Metric representations.
type Parser interface {
Initialize(enableMetricType bool) error
Initialize(enableMetricType bool, sendTimerHistogram []TimerHistogramMapping) error
GetMetrics() pdata.Metrics
Aggregate(line string) error
}
Loading

0 comments on commit 3330d4f

Please sign in to comment.