Skip to content

Commit

Permalink
feat(prometheus): Allow to specify metric type (#13874)
Browse files Browse the repository at this point in the history
  • Loading branch information
srebhan authored Sep 11, 2023
1 parent dd5c221 commit 84b3b58
Show file tree
Hide file tree
Showing 12 changed files with 279 additions and 36 deletions.
3 changes: 3 additions & 0 deletions config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/influxdata/telegraf/plugins/processors"
"github.com/influxdata/telegraf/plugins/serializers"
_ "github.com/influxdata/telegraf/plugins/serializers/all" // Blank import to have all serializers for testing
promserializer "github.com/influxdata/telegraf/plugins/serializers/prometheus"
"github.com/influxdata/telegraf/testutil"
)

Expand Down Expand Up @@ -612,6 +613,7 @@ func TestConfig_SerializerInterfaceNewFormat(t *testing.T) {
// Ignore all unexported fields and fields not relevant for functionality
options := []cmp.Option{
cmpopts.IgnoreUnexported(stype),
cmpopts.IgnoreUnexported(reflect.Indirect(reflect.ValueOf(promserializer.MetricTypes{})).Interface()),
cmpopts.IgnoreTypes(sync.Mutex{}, regexp.Regexp{}),
cmpopts.IgnoreInterfaces(struct{ telegraf.Logger }{}),
}
Expand Down Expand Up @@ -703,6 +705,7 @@ func TestConfig_SerializerInterfaceOldFormat(t *testing.T) {
// Ignore all unexported fields and fields not relevant for functionality
options := []cmp.Option{
cmpopts.IgnoreUnexported(stype),
cmpopts.IgnoreUnexported(reflect.Indirect(reflect.ValueOf(promserializer.MetricTypes{})).Interface()),
cmpopts.IgnoreTypes(sync.Mutex{}, regexp.Regexp{}),
cmpopts.IgnoreInterfaces(struct{ telegraf.Logger }{}),
}
Expand Down
6 changes: 6 additions & 0 deletions plugins/outputs/prometheus_client/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,12 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details.

## Export metric collection time.
# export_timestamp = false

## Specify the metric type explicitly.
## This overrides the metric-type of the Telegraf metric. Globbing is allowed.
# [outputs.prometheus_client.prometheus_metric_types]
# counter = []
# gauge = []
```

## Metrics
Expand Down
45 changes: 31 additions & 14 deletions plugins/outputs/prometheus_client/prometheus_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/influxdata/telegraf/plugins/outputs"
v1 "github.com/influxdata/telegraf/plugins/outputs/prometheus_client/v1"
v2 "github.com/influxdata/telegraf/plugins/outputs/prometheus_client/v2"
serializer "github.com/influxdata/telegraf/plugins/serializers/prometheus"
)

//go:embed sample.conf
Expand All @@ -43,18 +44,19 @@ type Collector interface {
}

type PrometheusClient struct {
Listen string `toml:"listen"`
ReadTimeout config.Duration `toml:"read_timeout"`
WriteTimeout config.Duration `toml:"write_timeout"`
MetricVersion int `toml:"metric_version"`
BasicUsername string `toml:"basic_username"`
BasicPassword string `toml:"basic_password"`
IPRange []string `toml:"ip_range"`
ExpirationInterval config.Duration `toml:"expiration_interval"`
Path string `toml:"path"`
CollectorsExclude []string `toml:"collectors_exclude"`
StringAsLabel bool `toml:"string_as_label"`
ExportTimestamp bool `toml:"export_timestamp"`
Listen string `toml:"listen"`
ReadTimeout config.Duration `toml:"read_timeout"`
WriteTimeout config.Duration `toml:"write_timeout"`
MetricVersion int `toml:"metric_version"`
BasicUsername string `toml:"basic_username"`
BasicPassword string `toml:"basic_password"`
IPRange []string `toml:"ip_range"`
ExpirationInterval config.Duration `toml:"expiration_interval"`
Path string `toml:"path"`
CollectorsExclude []string `toml:"collectors_exclude"`
StringAsLabel bool `toml:"string_as_label"`
ExportTimestamp bool `toml:"export_timestamp"`
TypeMappings serializer.MetricTypes `toml:"metric_types"`
tlsint.ServerConfig

Log telegraf.Logger `toml:"-"`
Expand Down Expand Up @@ -96,17 +98,32 @@ func (p *PrometheusClient) Init() error {
}
}

if err := p.TypeMappings.Init(); err != nil {
return err
}

switch p.MetricVersion {
default:
fallthrough
case 1:
p.collector = v1.NewCollector(time.Duration(p.ExpirationInterval), p.StringAsLabel, p.ExportTimestamp, p.Log)
p.collector = v1.NewCollector(
time.Duration(p.ExpirationInterval),
p.StringAsLabel,
p.ExportTimestamp,
p.TypeMappings,
p.Log,
)
err := registry.Register(p.collector)
if err != nil {
return err
}
case 2:
p.collector = v2.NewCollector(time.Duration(p.ExpirationInterval), p.StringAsLabel, p.ExportTimestamp)
p.collector = v2.NewCollector(
time.Duration(p.ExpirationInterval),
p.StringAsLabel,
p.ExportTimestamp,
p.TypeMappings,
)
err := registry.Register(p.collector)
if err != nil {
return err
Expand Down
69 changes: 61 additions & 8 deletions plugins/outputs/prometheus_client/prometheus_client_v1_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (

"github.com/influxdata/telegraf"
inputs "github.com/influxdata/telegraf/plugins/inputs/prometheus"
"github.com/influxdata/telegraf/plugins/serializers/prometheus"
"github.com/influxdata/telegraf/testutil"
)

Expand Down Expand Up @@ -268,24 +269,76 @@ rpc_duration_seconds{quantile="0.9"} 9001
rpc_duration_seconds{quantile="0.99"} 76656
rpc_duration_seconds_sum 1.7560473e+07
rpc_duration_seconds_count 2693
`),
},
{
name: "prometheus untyped forced to counter",
output: &PrometheusClient{
Listen: ":0",
MetricVersion: 1,
CollectorsExclude: []string{"gocollector", "process"},
Path: "/metrics",
TypeMappings: prometheus.MetricTypes{Counter: []string{"cpu_time_idle"}},
Log: logger,
},
metrics: []telegraf.Metric{
testutil.MustMetric(
"cpu_time_idle",
map[string]string{
"host": "example.org",
},
map[string]interface{}{
"value": 42,
},
time.Unix(0, 0),
),
},
expected: []byte(`
# HELP cpu_time_idle Telegraf collected metric
# TYPE cpu_time_idle counter
cpu_time_idle{host="example.org"} 42
`),
},
{
name: "prometheus untyped forced to gauge",
output: &PrometheusClient{
Listen: ":0",
MetricVersion: 1,
CollectorsExclude: []string{"gocollector", "process"},
Path: "/metrics",
TypeMappings: prometheus.MetricTypes{Gauge: []string{"cpu_time_idle"}},
Log: logger,
},
metrics: []telegraf.Metric{
testutil.MustMetric(
"cpu_time_idle",
map[string]string{
"host": "example.org",
},
map[string]interface{}{
"value": 42.0,
},
time.Unix(0, 0),
),
},
expected: []byte(`
# HELP cpu_time_idle Telegraf collected metric
# TYPE cpu_time_idle gauge
cpu_time_idle{host="example.org"} 42
`),
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
err := tt.output.Init()
require.NoError(t, err)
require.NoError(t, tt.output.Init())

err = tt.output.Connect()
require.NoError(t, err)
require.NoError(t, tt.output.Connect())

defer func() {
err := tt.output.Close()
require.NoError(t, err)
require.NoError(t, tt.output.Close())
}()

err = tt.output.Write(tt.metrics)
require.NoError(t, err)
require.NoError(t, tt.output.Write(tt.metrics))

resp, err := http.Get(tt.output.URL())
require.NoError(t, err)
Expand Down
70 changes: 61 additions & 9 deletions plugins/outputs/prometheus_client/prometheus_client_v2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (

"github.com/influxdata/telegraf"
inputs "github.com/influxdata/telegraf/plugins/inputs/prometheus"
"github.com/influxdata/telegraf/plugins/serializers/prometheus"
"github.com/influxdata/telegraf/testutil"
)

Expand Down Expand Up @@ -299,24 +300,75 @@ cpu_usage_idle_count{cpu="cpu1"} 20
cpu_usage_idle_bucket{cpu="cpu1",le="+Inf"} 20
cpu_usage_idle_sum{cpu="cpu1"} 2000
cpu_usage_idle_count{cpu="cpu1"} 20
`),
},
{
name: "untyped forced to counter",
output: &PrometheusClient{
Listen: ":0",
MetricVersion: 2,
CollectorsExclude: []string{"gocollector", "process"},
Path: "/metrics",
TypeMappings: prometheus.MetricTypes{Counter: []string{"cpu_time_idle"}},
Log: logger,
},
metrics: []telegraf.Metric{
testutil.MustMetric(
"prometheus",
map[string]string{
"host": "example.org",
},
map[string]interface{}{
"cpu_time_idle": 42,
},
time.Unix(0, 0),
),
},
expected: []byte(`
# HELP cpu_time_idle Telegraf collected metric
# TYPE cpu_time_idle counter
cpu_time_idle{host="example.org"} 42
`),
},
{
name: "untyped forced to gauge",
output: &PrometheusClient{
Listen: ":0",
MetricVersion: 2,
CollectorsExclude: []string{"gocollector", "process"},
Path: "/metrics",
TypeMappings: prometheus.MetricTypes{Gauge: []string{"cpu_time_idle"}},
Log: logger,
},
metrics: []telegraf.Metric{
testutil.MustMetric(
"prometheus",
map[string]string{
"host": "example.org",
},
map[string]interface{}{
"cpu_time_idle": 42.0,
},
time.Unix(0, 0),
),
},
expected: []byte(`
# HELP cpu_time_idle Telegraf collected metric
# TYPE cpu_time_idle gauge
cpu_time_idle{host="example.org"} 42
`),
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
err := tt.output.Init()
require.NoError(t, err)

err = tt.output.Connect()
require.NoError(t, err)
require.NoError(t, tt.output.Init())
require.NoError(t, tt.output.Connect())

defer func() {
err := tt.output.Close()
require.NoError(t, err)
require.NoError(t, tt.output.Close())
}()

err = tt.output.Write(tt.metrics)
require.NoError(t, err)
require.NoError(t, tt.output.Write(tt.metrics))

resp, err := http.Get(tt.output.URL())
require.NoError(t, err)
Expand Down
6 changes: 6 additions & 0 deletions plugins/outputs/prometheus_client/sample.conf
Original file line number Diff line number Diff line change
Expand Up @@ -45,3 +45,9 @@

## Export metric collection time.
# export_timestamp = false

## Specify the metric type explicitly.
## This overrides the metric-type of the Telegraf metric. Globbing is allowed.
# [outputs.prometheus_client.prometheus_metric_types]
# counter = []
# gauge = []
13 changes: 11 additions & 2 deletions plugins/outputs/prometheus_client/v1/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,18 +54,26 @@ type Collector struct {
ExpirationInterval time.Duration
StringAsLabel bool
ExportTimestamp bool
TypeMapping serializer.MetricTypes
Log telegraf.Logger

sync.Mutex
fam map[string]*MetricFamily
expireTicker *time.Ticker
}

func NewCollector(expire time.Duration, stringsAsLabel bool, exportTimestamp bool, logger telegraf.Logger) *Collector {
func NewCollector(
expire time.Duration,
stringsAsLabel bool,
exportTimestamp bool,
typeMapping serializer.MetricTypes,
logger telegraf.Logger,
) *Collector {
c := &Collector{
ExpirationInterval: expire,
StringAsLabel: stringsAsLabel,
ExportTimestamp: exportTimestamp,
TypeMapping: typeMapping,
Log: logger,
fam: make(map[string]*MetricFamily),
}
Expand Down Expand Up @@ -176,9 +184,10 @@ func (c *Collector) addMetricFamily(point telegraf.Metric, sample *Sample, mname
var fam *MetricFamily
var ok bool
if fam, ok = c.fam[mname]; !ok {
pointType := c.TypeMapping.DetermineType(mname, point)
fam = &MetricFamily{
Samples: make(map[SampleID]*Sample),
TelegrafValueType: point.Type(),
TelegrafValueType: pointType,
LabelSet: make(map[string]int),
}
c.fam[mname] = fam
Expand Down
8 changes: 7 additions & 1 deletion plugins/outputs/prometheus_client/v2/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,16 @@ type Collector struct {
coll *serializer.Collection
}

func NewCollector(expire time.Duration, stringsAsLabel bool, exportTimestamp bool) *Collector {
func NewCollector(
expire time.Duration,
stringsAsLabel bool,
exportTimestamp bool,
typeMapping serializer.MetricTypes,
) *Collector {
cfg := serializer.FormatConfig{
StringAsLabel: stringsAsLabel,
ExportTimestamp: exportTimestamp,
TypeMappings: typeMapping,
}

return &Collector{
Expand Down
6 changes: 6 additions & 0 deletions plugins/serializers/prometheus/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,12 @@ reporting others every bucket/quantile will continue to exist.
## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
data_format = "prometheus"

## Specify the metric type explicitly.
## This overrides the metric-type of the Telegraf metric. Globbing is allowed.
[outputs.file.prometheus_metric_types]
counter = []
gauge = []
```

### Metrics
Expand Down
3 changes: 2 additions & 1 deletion plugins/serializers/prometheus/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,10 +188,11 @@ func (c *Collection) Add(metric telegraf.Metric, now time.Time) {
if !ok {
continue
}
metricType := c.config.TypeMappings.DetermineType(metricName, metric)

family := MetricFamily{
Name: metricName,
Type: metric.Type(),
Type: metricType,
}

entry, ok := c.Entries[family]
Expand Down
Loading

0 comments on commit 84b3b58

Please sign in to comment.