Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(inputs.prometheus): pod_annotation_template and pod_label_template #10946

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 36 additions & 18 deletions plugins/inputs/prometheus/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,50 +10,56 @@ in Prometheus format.
[[inputs.prometheus]]
## An array of urls to scrape metrics from.
urls = ["http://localhost:9100/metrics"]

## Metric version controls the mapping from Prometheus metrics into
## Telegraf metrics. When using the prometheus_client output, use the same
## value in both plugins to ensure metrics are round-tripped without
## modification.
##
## example: metric_version = 1;
## example: metric_version = 1;
## metric_version = 2; recommended version
# metric_version = 1

## Url tag name (tag containing scrapped url. optional, default is "url")
# url_tag = "url"

## Whether the timestamp of the scraped metrics will be ignored.
## If set to true, the gather time will be used.
# ignore_timestamp = false

## An array of Kubernetes services to scrape metrics from.
# kubernetes_services = ["http://my-service-dns.my-namespace:9100/metrics"]

## Kubernetes config file to create client from.
# kube_config = "/path/to/kubernetes.config"

## Scrape Kubernetes pods for the following prometheus annotations:
## - prometheus.io/scrape: Enable scraping for this pod
## - prometheus.io/scheme: If the metrics endpoint is secured then you will need to
## set this to 'https' & most likely set the tls config.
## - prometheus.io/path: If the metrics path is not /metrics, define it with this annotation.
## - prometheus.io/port: If port is not 9102 use this annotation
# monitor_kubernetes_pods = true

## Get the list of pods to scrape with either the scope of
## - cluster: the kubernetes watch api (default, no need to specify)
## - node: the local cadvisor api; for scalability. Note that the config node_ip or the environment variable NODE_IP must be set to the host IP.
# pod_scrape_scope = "cluster"

## Only for node scrape scope: node IP of the node that telegraf is running on.
## Either this config or the environment variable NODE_IP must be set.
# node_ip = "10.180.1.1"

## Only for node scrape scope: interval in seconds for how often to get updated pod list for scraping.
## Default is 60 seconds.
# pod_scrape_interval = 60


## Pod label & annotation tag template
## Default is "{{ . }}"
## Useful to glob match labels & annotations in "tagexclude" or "taginclude"
# pod_label_template = "{{ . }}"
# pod_annotation_template = "{{ . }}"
Comment on lines +57 to +61
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it really necessary to make this a template or can we get away with defining a prefix? I'm asking because this adds complexity that might not be required and furthermore it's not completely intuitive to use a template, usually use to generate stuff, for parsing...


## Restricts Kubernetes monitoring to a single namespace
## ex: monitor_kubernetes_pods_namespace = "default"
# monitor_kubernetes_pods_namespace = ""
Expand All @@ -63,7 +69,7 @@ in Prometheus format.
# eg. To scrape pods on a specific node
# kubernetes_field_selector = "spec.nodeName=$HOSTNAME"

# cache refresh interval to set the interval for re-sync of pods list.
# cache refresh interval to set the interval for re-sync of pods list.
# Default is 60 minutes.
# cache_refresh_interval = 60

Expand All @@ -79,25 +85,25 @@ in Prometheus format.
# url = 'http://{{if ne .ServiceAddress ""}}{{.ServiceAddress}}{{else}}{{.Address}}{{end}}:{{.ServicePort}}/{{with .ServiceMeta.metrics_path}}{{.}}{{else}}metrics{{end}}'
# [inputs.prometheus.consul.query.tags]
# host = "{{.Node}}"

## Use bearer token for authorization. ('bearer_token' takes priority)
# bearer_token = "/path/to/bearer/token"
## OR
# bearer_token_string = "abc_123"

## HTTP Basic Authentication username and password. ('bearer_token' and
## 'bearer_token_string' take priority)
# username = ""
# password = ""

## Specify timeout duration for slower prometheus clients (default is 3s)
# response_timeout = "3s"

## Optional TLS Config
# tls_ca = /path/to/cafile
# tls_cert = /path/to/certfile
# tls_key = /path/to/keyfile

## Use TLS but skip chain & host verification
# insecure_skip_verify = false
```
Expand Down Expand Up @@ -135,7 +141,7 @@ env:
valueFrom:
fieldRef:
fieldPath: status.hostIP
```
```

If using node level scrape scope, `pod_scrape_interval` specifies how often (in seconds) the pod list for scraping should updated. If not specified, the default is 60 seconds.

Expand Down Expand Up @@ -179,6 +185,18 @@ metadata:
name: telegraf-k8s-{{ .Release.Name }}
```

### Excluding Annotation & Label Tags

```diff
[[inputs.prometheus]]
metric_version = 2
monitor_kubernetes_pods = true
pod_scrape_scope = "node"
+ pod_label_template = 'pod.label/{{ . }}'
+ pod_annotation_template = 'pod.annotation/{{ . }}'
Comment on lines +195 to +196
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See above, we could cover this with just

  pod_label_prefix = 'pod.label'
  pod_annotation_prefix = 'pod.annotation'

+ tagexclude = ["pod.annotation/*", "pod.label/*"]
```

### Consul Service Discovery

Enabling this option and configuring consul `agent` url will allow the plugin to query
Expand Down
4 changes: 2 additions & 2 deletions plugins/inputs/prometheus/consul.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,14 +188,14 @@ func (p *Prometheus) getConsulServiceURL(q *ConsulQuery, s *api.CatalogService)
return nil, err
}

extraTags := make(map[string]string)
extraTags := make(map[string]Tag)
for tagName, tagTemplate := range q.serviceExtraTagsTemplate {
buffer.Reset()
err = tagTemplate.Execute(&buffer, s)
if err != nil {
return nil, err
}
extraTags[tagName] = buffer.String()
extraTags[tagName] = Tag{Value: buffer.String()}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this necessary?

}

p.Log.Debugf("Will scrape metrics from Consul Service %s", serviceURL.String())
Expand Down
12 changes: 6 additions & 6 deletions plugins/inputs/prometheus/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -368,16 +368,16 @@ func registerPod(pod *corev1.Pod, p *Prometheus) {
}

p.Log.Debugf("will scrape metrics from %q", targetURL.String())
tags := map[string]Tag{}
tags["pod_name"] = Tag{Value: pod.Name}
tags["namespace"] = Tag{Value: pod.Namespace}
// add annotation as metrics tags
tags := pod.Annotations
if tags == nil {
tags = map[string]string{}
for k, v := range pod.Annotations {
tags[k] = Tag{Value: v, Template: p.podAnnotationTmpl}
}
tags["pod_name"] = pod.Name
tags["namespace"] = pod.Namespace
// add labels as metrics tags
for k, v := range pod.Labels {
tags[k] = v
tags[k] = Tag{Value: v, Template: p.podLabelTmpl}
Comment on lines +371 to +380
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't it be better to directly apply the modification here instead of looping this through all the code?

}
podURL := p.AddressToURL(targetURL, targetURL.Hostname())

Expand Down
73 changes: 73 additions & 0 deletions plugins/inputs/prometheus/kubernetes_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package prometheus

import (
"fmt"
"testing"

"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -175,6 +176,78 @@ func TestInvalidFieldSelector(t *testing.T) {
require.NotEqual(t, err, nil)
}

func TestPodLabelAnnotationTemplateOmitted(t *testing.T) {
prom := &Prometheus{Log: testutil.Logger{}}
err := prom.Init()
require.NoError(t, err)

promScrapeKey := "prometheus.io/scrape"
kubernetesNameKey := "kubernetes.io/name"

p := pod()
p.Annotations = map[string]string{promScrapeKey: "true"}
p.Labels = map[string]string{kubernetesNameKey: "app"}
registerPod(p, prom)
require.Equal(t, 1, len(prom.kubernetesPods))

var pods []URLAndAddress
for _, v := range prom.kubernetesPods {
pods = append(pods, v)
}

kubernetesNameTag := pods[0].Tags[kubernetesNameKey]
require.NotEmpty(t, kubernetesNameTag)
promScrapeTag := pods[0].Tags[promScrapeKey]
require.NotEmpty(t, promScrapeTag)

kubernetesNameKeyRendered, err := kubernetesNameTag.RenderKey(kubernetesNameKey)
require.NoError(t, err)
require.Equal(t, kubernetesNameKey, kubernetesNameKeyRendered)

promScrapeKeyRendered, err := promScrapeTag.RenderKey(promScrapeKey)
require.NoError(t, err)
require.Equal(t, promScrapeKey, promScrapeKeyRendered)
}

func TestPodLabelAnnotationTemplate(t *testing.T) {
prom := &Prometheus{
Log: testutil.Logger{},
PodLabelTemplate: "pod.label/{{ . }}",
PodAnnotationTemplate: "pod.annotation/{{ . }}",
}
err := prom.Init()
require.NoError(t, err)
require.NotEmpty(t, prom.podLabelTmpl)
require.NotEmpty(t, prom.podAnnotationTmpl)

promScrapeKey := "prometheus.io/scrape"
kubernetesNameKey := "kubernetes.io/name"

p := pod()
p.Annotations = map[string]string{promScrapeKey: "true"}
p.Labels = map[string]string{kubernetesNameKey: "app"}
registerPod(p, prom)
require.Equal(t, 1, len(prom.kubernetesPods))

var pods []URLAndAddress
for _, v := range prom.kubernetesPods {
pods = append(pods, v)
}

kubernetesNameTag := pods[0].Tags[kubernetesNameKey]
require.NotEmpty(t, kubernetesNameTag)
promScrapeTag := pods[0].Tags[promScrapeKey]
require.NotEmpty(t, promScrapeTag)

kubernetesNameKeyRendered, err := kubernetesNameTag.RenderKey(kubernetesNameKey)
require.NoError(t, err)
require.Equal(t, fmt.Sprintf("pod.label/%s", kubernetesNameKey), kubernetesNameKeyRendered)

promScrapeKeyRendered, err := promScrapeTag.RenderKey(promScrapeKey)
require.NoError(t, err)
require.Equal(t, fmt.Sprintf("pod.annotation/%s", promScrapeKey), promScrapeKeyRendered)
}

func pod() *corev1.Pod {
p := &corev1.Pod{ObjectMeta: metav1.ObjectMeta{}, Status: corev1.PodStatus{}, Spec: corev1.PodSpec{}}
p.Status.PodIP = "127.0.0.1"
Expand Down
81 changes: 66 additions & 15 deletions plugins/inputs/prometheus/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"os"
"strings"
"sync"
"text/template"
"time"

"k8s.io/apimachinery/pkg/fields"
Expand Down Expand Up @@ -69,15 +70,19 @@ type Prometheus struct {
headers map[string]string

// Should we scrape Kubernetes services for prometheus annotations
MonitorPods bool `toml:"monitor_kubernetes_pods"`
PodScrapeScope string `toml:"pod_scrape_scope"`
NodeIP string `toml:"node_ip"`
PodScrapeInterval int `toml:"pod_scrape_interval"`
PodNamespace string `toml:"monitor_kubernetes_pods_namespace"`
lock sync.Mutex
kubernetesPods map[string]URLAndAddress
cancel context.CancelFunc
wg sync.WaitGroup
MonitorPods bool `toml:"monitor_kubernetes_pods"`
PodScrapeScope string `toml:"pod_scrape_scope"`
PodLabelTemplate string `toml:"pod_label_template"`
PodAnnotationTemplate string `toml:"pod_annotation_template"`
NodeIP string `toml:"node_ip"`
PodScrapeInterval int `toml:"pod_scrape_interval"`
PodNamespace string `toml:"monitor_kubernetes_pods_namespace"`
lock sync.Mutex
kubernetesPods map[string]URLAndAddress
cancel context.CancelFunc
wg sync.WaitGroup
podLabelTmpl *template.Template
podAnnotationTmpl *template.Template

// Only for monitor_kubernetes_pods=true and pod_scrape_scope="node"
podLabelSelector labels.Selector
Expand Down Expand Up @@ -128,6 +133,24 @@ func (p *Prometheus) Init() error {
p.Log.Infof("Using the label selector: %v and field selector: %v", p.podLabelSelector, p.podFieldSelector)
}

// Configure pod label template
if p.PodLabelTemplate != "" {
podLabelTmpl, err := template.New("pod_label_template").Parse(p.PodLabelTemplate)
if err != nil {
return err
}
p.podLabelTmpl = podLabelTmpl
}

// Configure pod annotation template
if p.PodAnnotationTemplate != "" {
podAnnotationTmpl, err := template.New("pod_annotation_template").Parse(p.PodAnnotationTemplate)
if err != nil {
return err
}
p.podAnnotationTmpl = podAnnotationTmpl
}

return nil
}

Expand All @@ -150,11 +173,31 @@ func (p *Prometheus) AddressToURL(u *url.URL, address string) *url.URL {
return reconstructedURL
}

type Tag struct {
Value string
Template *template.Template
}

func (t *Tag) RenderKey(k string) (string, error) {
if t.Template != nil {
var buffer strings.Builder
err := t.Template.Execute(&buffer, k)

if err != nil {
return k, fmt.Errorf("Failed to execute tag key template, skipping modification for `%s`. Error: %v", k, err)
}

return buffer.String(), nil
}

return k, nil
}

type URLAndAddress struct {
OriginalURL *url.URL
URL *url.URL
Address string
Tags map[string]string
Tags map[string]Tag
}

func (p *Prometheus) GetAllURLs() (map[string]URLAndAddress, error) {
Expand Down Expand Up @@ -354,7 +397,13 @@ func (p *Prometheus) gatherURL(u URLAndAddress, acc telegraf.Accumulator) error
tags["address"] = u.Address
}
for k, v := range u.Tags {
tags[k] = v
k, err := v.RenderKey(k)

if err != nil {
p.Log.Error(err)
}

tags[k] = v.Value
}

switch metric.Type() {
Expand Down Expand Up @@ -431,10 +480,12 @@ func (p *Prometheus) Stop() {
func init() {
inputs.Add("prometheus", func() telegraf.Input {
return &Prometheus{
ResponseTimeout: config.Duration(time.Second * 3),
kubernetesPods: map[string]URLAndAddress{},
consulServices: map[string]URLAndAddress{},
URLTag: "url",
ResponseTimeout: config.Duration(time.Second * 3),
kubernetesPods: map[string]URLAndAddress{},
consulServices: map[string]URLAndAddress{},
PodLabelTemplate: "{{ . }}",
PodAnnotationTemplate: "{{ . }}",
URLTag: "url",
}
})
}