Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prometheus [Input] plugin - Optimizing for bigger kubernetes clusters (500+ pods) when scraping thru 'monitor_kubernetes_pods' #8762

Merged
merged 26 commits into from
Mar 8, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
bdcb7ea
more cleanup and error handling
gracewehner Jan 13, 2021
b65a1a4
add cadvisor option for scalable telegraf scraping
gracewehner Jan 14, 2021
24cda5d
comments and cleanup
gracewehner Jan 13, 2021
d08750f
logging and executable
gracewehner Jan 19, 2021
76af87b
use k8s package for selector parsing and matching
gracewehner Jan 21, 2021
4e20fef
fixes
gracewehner Jan 21, 2021
2436923
cleanup and error handling for selector parsing
gracewehner Jan 22, 2021
9bda346
more cleanup
gracewehner Jan 22, 2021
4ea31a3
Delete telegraf.zip
gracewehner Jan 25, 2021
1255e25
Update README.md
gracewehner Jan 25, 2021
7d29972
Fix whitespace
gracewehner Jan 25, 2021
673b181
More error handling
gracewehner Jan 25, 2021
4e4141c
Add tests
gracewehner Jan 25, 2021
797c07f
Update README.md
gracewehner Jan 27, 2021
1cfec11
fix test and dependencies
gracewehner Jan 27, 2021
cdc439c
add licenses
gracewehner Jan 27, 2021
34fb243
Add error handling for NODE_IP env var
gracewehner Jan 27, 2021
353e008
Add note about NODE_IP
gracewehner Jan 27, 2021
768e899
go mod tidy changes
gracewehner Jan 27, 2021
49336fa
remove duplicate line
gracewehner Jan 27, 2021
79e0e71
make fmt
gracewehner Jan 27, 2021
a100a6b
addressed PR comments
gracewehner Mar 3, 2021
d662caf
don't change all the whitespace in the readme
gracewehner Mar 3, 2021
d367504
add config for node ip and use env var as default
gracewehner Mar 5, 2021
1a80d8e
go fmt
gracewehner Mar 5, 2021
f97278c
revert readme whitespace changed from go fmt
gracewehner Mar 5, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/LICENSE_OF_DEPENDENCIES.md
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,8 @@ following works:
- gopkg.in/tomb.v2 [BSD 3-Clause Clear License](https://github.com/go-tomb/tomb/blob/v2/LICENSE)
- gopkg.in/yaml.v2 [Apache License 2.0](https://github.com/go-yaml/yaml/blob/v2.2.2/LICENSE)
- gopkg.in/yaml.v3 [Apache License 2.0](https://github.com/go-yaml/yaml/blob/v3/LICENSE)
- k8s.io/apimachinery [Apache License 2.0](https://github.com/kubernetes/apimachinery/blob/master/LICENSE)
- k8s.io/klog [Apache License 2.0](https://github.com/kubernetes/klog/blob/master/LICENSE)
- modernc.org/libc [BSD 3-Clause "New" or "Revised" License](https://gitlab.com/cznic/libc/-/blob/master/LICENSE)
- modernc.org/memory [BSD 3-Clause "New" or "Revised" License](https://gitlab.com/cznic/memory/-/blob/master/LICENSE)
- modernc.org/sqlite [BSD 3-Clause "New" or "Revised" License](https://gitlab.com/cznic/sqlite/-/blob/master/LICENSE)
Expand Down
21 changes: 21 additions & 0 deletions plugins/inputs/prometheus/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,16 @@ in Prometheus format.
## - prometheus.io/path: If the metrics path is not /metrics, define it with this annotation.
## - prometheus.io/port: If port is not 9102 use this annotation
# monitor_kubernetes_pods = true
## Get the list of pods to scrape with either the scope of
## - cluster: the kubernetes watch api (default), no need to specify
## - node: the local cadvisor api; for scalability. Note that the config node_ip or the environment variable NODE_IP must be set to the host IP.
# pod_scrape_scope = "cluster"
## Only for node scrape scope: node IP of the node that telegraf is running on.
## Either this config or the environment variable NODE_IP must be set.
# node_ip = "10.180.1.1"
## Only for node scrape scope: interval in seconds for how often to get updated pod list for scraping
## Default is 60 seconds.
# pod_scrape_interval = 60
## Restricts Kubernetes monitoring to a single namespace
## ex: monitor_kubernetes_pods_namespace = "default"
# monitor_kubernetes_pods_namespace = ""
Expand Down Expand Up @@ -88,6 +98,17 @@ Currently the following annotation are supported:

Using the `monitor_kubernetes_pods_namespace` option allows you to limit which pods you are scraping.

Using `pod_scrape_scope = "node"` allows more scalable scraping for pods which will scrape pods only in the node that telegraf is running. It will fetch the pod list locally from the node's kubelet. This will require running Telegraf in every node of the cluster. Note that either `node_ip` must be specified in the config or the environment variable `NODE_IP` must be set to the host IP. ThisThe latter can be done in the yaml of the pod running telegraf:
```
env:
- name: NODE_IP
valueFrom:
fieldRef:
fieldPath: status.hostIP
```

If using node level scrape scope, `pod_scrape_interval` specifies how often (in seconds) the pod list for scraping should updated. If not specified, the default is 60 seconds.

#### Bearer Token

If set, the file specified by the `bearer_token` parameter will be read on
Expand Down
181 changes: 176 additions & 5 deletions plugins/inputs/prometheus/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@ package prometheus

import (
"context"
"encoding/json"
"fmt"
"io/ioutil"
"log"
"net"
"net/http"
"net/url"
"os/user"
"path/filepath"
Expand All @@ -15,13 +17,29 @@ import (
"github.com/ericchiang/k8s"
corev1 "github.com/ericchiang/k8s/apis/core/v1"
"github.com/ghodss/yaml"
"github.com/kubernetes/apimachinery/pkg/fields"
"github.com/kubernetes/apimachinery/pkg/labels"
)

type payload struct {
eventype string
pod *corev1.Pod
}

type podMetadata struct {
ResourceVersion string `json:"resourceVersion"`
SelfLink string `json:"selfLink"`
}

type podResponse struct {
Kind string `json:"kind"`
ApiVersion string `json:"apiVersion"`
Metadata podMetadata `json:"metadata"`
Items []*corev1.Pod `json:"items,string,omitempty"`
}

const cAdvisorPodListDefaultInterval = 60

// loadClient parses a kubeconfig from a file and returns a Kubernetes
// client. It does not support extensions or client auth providers.
func loadClient(kubeconfigPath string) (*k8s.Client, error) {
Expand Down Expand Up @@ -66,9 +84,16 @@ func (p *Prometheus) start(ctx context.Context) error {
case <-ctx.Done():
return
case <-time.After(time.Second):
err := p.watch(ctx, client)
if err != nil {
p.Log.Errorf("Unable to watch resources: %s", err.Error())
if p.isNodeScrapeScope {
err = p.cAdvisor(ctx, client)
if err != nil {
p.Log.Errorf("Unable to monitor pods with node scrape scope: %s", err.Error())
}
} else {
err = p.watch(ctx, client)
if err != nil {
p.Log.Errorf("Unable to watch resources: %s", err.Error())
}
}
}
}
Expand Down Expand Up @@ -126,6 +151,147 @@ func (p *Prometheus) watch(ctx context.Context, client *k8s.Client) error {
}
}

func (p *Prometheus) cAdvisor(ctx context.Context, client *k8s.Client) error {
// Set InsecureSkipVerify for cAdvisor client since Node IP will not be a SAN for the CA cert
tlsConfig := client.Client.Transport.(*http.Transport).TLSClientConfig
tlsConfig.InsecureSkipVerify = true

// The request will be the same each time
podsUrl := fmt.Sprintf("https://%s:10250/pods", p.NodeIP)
req, err := http.NewRequest("GET", podsUrl, nil)
if err != nil {
return fmt.Errorf("Error when creating request to %s to get pod list: %w", podsUrl, err)
}
client.SetHeaders(req.Header)

// Update right away so code is not waiting the length of the specified scrape interval initially
err = updateCadvisorPodList(ctx, p, client, req)
if err != nil {
return fmt.Errorf("Error initially updating pod list: %w", err)
}

scrapeInterval := cAdvisorPodListDefaultInterval
if p.PodScrapeInterval != 0 {
scrapeInterval = p.PodScrapeInterval
}

for {
select {
case <-ctx.Done():
return nil
case <-time.After(time.Duration(scrapeInterval) * time.Second):
err := updateCadvisorPodList(ctx, p, client, req)
if err != nil {
return fmt.Errorf("Error updating pod list: %w", err)
}
}
}
}

func updateCadvisorPodList(ctx context.Context, p *Prometheus, client *k8s.Client, req *http.Request) error {

resp, err := client.Client.Do(req)
if err != nil {
return fmt.Errorf("Error when making request for pod list: %w", err)
}

// If err is nil, still check response code
if resp.StatusCode != 200 {
return fmt.Errorf("Error when making request for pod list with status %s", resp.Status)
}

defer resp.Body.Close()

cadvisorPodsResponse := podResponse{}

// Will have expected type errors for some parts of corev1.Pod struct for some unused fields
// Instead have nil checks for every used field in case of incorrect decoding
json.NewDecoder(resp.Body).Decode(&cadvisorPodsResponse)
pods := cadvisorPodsResponse.Items

// Updating pod list to be latest cadvisor response
p.lock.Lock()
p.kubernetesPods = make(map[string]URLAndAddress)

// Register pod only if it has an annotation to scrape, if it is ready,
// and if namespace and selectors are specified and match
for _, pod := range pods {
if necessaryPodFieldsArePresent(pod) &&
pod.GetMetadata().GetAnnotations()["prometheus.io/scrape"] == "true" &&
podReady(pod.GetStatus().GetContainerStatuses()) &&
podHasMatchingNamespace(pod, p) &&
podHasMatchingLabelSelector(pod, p.podLabelSelector) &&
podHasMatchingFieldSelector(pod, p.podFieldSelector) {
registerPod(pod, p)
}

}
p.lock.Unlock()

// No errors
return nil
}

func necessaryPodFieldsArePresent(pod *corev1.Pod) bool {
return pod.GetMetadata() != nil &&
pod.GetMetadata().GetAnnotations() != nil &&
pod.GetMetadata().GetLabels() != nil &&
pod.GetSpec() != nil &&
pod.GetStatus() != nil &&
pod.GetStatus().GetContainerStatuses() != nil
}

/* See the docs on kubernetes label selectors:
* https://kubernetes.io/docs/concepts/overview/working-with-objects/labels/#label-selectors
*/
func podHasMatchingLabelSelector(pod *corev1.Pod, labelSelector labels.Selector) bool {
if labelSelector == nil {
return true
}

var labelsSet labels.Set = pod.GetMetadata().GetLabels()
return labelSelector.Matches(labelsSet)
}

/* See ToSelectableFields() for list of fields that are selectable:
* https://github.com/kubernetes/kubernetes/release-1.20/pkg/registry/core/pod/strategy.go
* See docs on kubernetes field selectors:
* https://kubernetes.io/docs/concepts/overview/working-with-objects/field-selectors/
*/
func podHasMatchingFieldSelector(pod *corev1.Pod, fieldSelector fields.Selector) bool {
if fieldSelector == nil {
return true
}

podSpec := pod.GetSpec()
podStatus := pod.GetStatus()

// Spec and Status shouldn't be nil.
// Error handling just in case something goes wrong but won't crash telegraf
if podSpec == nil || podStatus == nil {
return false
}

fieldsSet := make(fields.Set)
fieldsSet["spec.nodeName"] = podSpec.GetNodeName()
fieldsSet["spec.restartPolicy"] = podSpec.GetRestartPolicy()
fieldsSet["spec.schedulerName"] = podSpec.GetSchedulerName()
fieldsSet["spec.serviceAccountName"] = podSpec.GetServiceAccountName()
fieldsSet["status.phase"] = podStatus.GetPhase()
fieldsSet["status.podIP"] = podStatus.GetPodIP()
fieldsSet["status.nominatedNodeName"] = podStatus.GetNominatedNodeName()

return fieldSelector.Matches(fieldsSet)
}

/*
* If a namespace is specified and the pod doesn't have that namespace, return false
* Else return true
*/
func podHasMatchingNamespace(pod *corev1.Pod, p *Prometheus) bool {
return !(p.PodNamespace != "" && pod.GetMetadata().GetNamespace() != p.PodNamespace)
}

func podReady(statuss []*corev1.ContainerStatus) bool {
if len(statuss) == 0 {
return false
Expand Down Expand Up @@ -180,14 +346,19 @@ func registerPod(pod *corev1.Pod, p *Prometheus) {
return
}
podURL := p.AddressToURL(URL, URL.Hostname())
p.lock.Lock()

// Locks earlier if using cAdvisor calls - makes a new list each time
// rather than updating and removing from the same list
if !p.isNodeScrapeScope {
p.lock.Lock()
defer p.lock.Unlock()
}
p.kubernetesPods[podURL.String()] = URLAndAddress{
URL: podURL,
Address: URL.Hostname(),
OriginalURL: URL,
Tags: tags,
}
p.lock.Unlock()
}

func getScrapeURL(pod *corev1.Pod) *string {
Expand Down
59 changes: 59 additions & 0 deletions plugins/inputs/prometheus/kubernetes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ import (

v1 "github.com/ericchiang/k8s/apis/core/v1"
metav1 "github.com/ericchiang/k8s/apis/meta/v1"

"github.com/kubernetes/apimachinery/pkg/fields"
"github.com/kubernetes/apimachinery/pkg/labels"
)

func TestScrapeURLNoAnnotations(t *testing.T) {
Expand Down Expand Up @@ -142,6 +145,62 @@ func TestPodSelector(t *testing.T) {
}
}

func TestPodHasMatchingNamespace(t *testing.T) {
prom := &Prometheus{Log: testutil.Logger{}, PodNamespace: "default"}

pod := pod()
pod.Metadata.Name = str("Pod1")
pod.Metadata.Namespace = str("default")
shouldMatch := podHasMatchingNamespace(pod, prom)
assert.Equal(t, true, shouldMatch)

pod.Metadata.Name = str("Pod2")
pod.Metadata.Namespace = str("namespace")
shouldNotMatch := podHasMatchingNamespace(pod, prom)
assert.Equal(t, false, shouldNotMatch)
}

func TestPodHasMatchingLabelSelector(t *testing.T) {
labelSelectorString := "label0==label0,label1=label1,label2!=label,label3 in (label1,label2, label3),label4 notin (label1, label2,label3),label5,!label6"
prom := &Prometheus{Log: testutil.Logger{}, KubernetesLabelSelector: labelSelectorString}

pod := pod()
pod.Metadata.Labels = make(map[string]string)
pod.Metadata.Labels["label0"] = "label0"
pod.Metadata.Labels["label1"] = "label1"
pod.Metadata.Labels["label2"] = "label2"
pod.Metadata.Labels["label3"] = "label3"
pod.Metadata.Labels["label4"] = "label4"
pod.Metadata.Labels["label5"] = "label5"

labelSelector, err := labels.Parse(prom.KubernetesLabelSelector)
assert.Equal(t, err, nil)
assert.Equal(t, true, podHasMatchingLabelSelector(pod, labelSelector))
}

func TestPodHasMatchingFieldSelector(t *testing.T) {
fieldSelectorString := "status.podIP=127.0.0.1,spec.restartPolicy=Always,spec.NodeName!=nodeName"
prom := &Prometheus{Log: testutil.Logger{}, KubernetesFieldSelector: fieldSelectorString}
pod := pod()
pod.Spec.RestartPolicy = str("Always")
pod.Spec.NodeName = str("node1000")

fieldSelector, err := fields.ParseSelector(prom.KubernetesFieldSelector)
assert.Equal(t, err, nil)
assert.Equal(t, true, podHasMatchingFieldSelector(pod, fieldSelector))
}

func TestInvalidFieldSelector(t *testing.T) {
fieldSelectorString := "status.podIP=127.0.0.1,spec.restartPolicy=Always,spec.NodeName!=nodeName,spec.nodeName"
prom := &Prometheus{Log: testutil.Logger{}, KubernetesFieldSelector: fieldSelectorString}
pod := pod()
pod.Spec.RestartPolicy = str("Always")
pod.Spec.NodeName = str("node1000")

_, err := fields.ParseSelector(prom.KubernetesFieldSelector)
assert.NotEqual(t, err, nil)
}

func pod() *v1.Pod {
p := &v1.Pod{Metadata: &metav1.ObjectMeta{}, Status: &v1.PodStatus{}, Spec: &v1.PodSpec{}}
p.Status.PodIP = str("127.0.0.1")
Expand Down
Loading