Skip to content

Commit

Permalink
[kubernetes_state] improves tagging compliancy
Browse files Browse the repository at this point in the history
The `kubernetes_state` and `kubernetes` checks were using a different set of tag names
to convey the same information.

This change aims to introduce the `kubernetes` check tag set in the `kubernetes_state`
check.

To ease the user migration to the new tag names. This commit also introduces a new
parameter `keep_ksm_labels` to have both tag sets on `kubernetes_state` metrics.
Currently, `keep_ksm_labels` is set to `True` by default.
  • Loading branch information
clamoriniere committed Nov 28, 2019
1 parent 4f19c03 commit 4e3f724
Show file tree
Hide file tree
Showing 2 changed files with 133 additions and 65 deletions.
161 changes: 109 additions & 52 deletions kubernetes_state/datadog_checks/kubernetes_state/kubernetes_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from datadog_checks.checks.openmetrics import OpenMetricsBaseCheck
from datadog_checks.config import is_affirmative
from datadog_checks.errors import CheckException
from datadog_checks.utils.common import to_string

try:
# this module is only available in agent 6
Expand All @@ -28,6 +29,21 @@ def get_clustername():
WHITELISTED_WAITING_REASONS = ['errimagepull', 'imagepullbackoff', 'crashloopbackoff', 'containercreating']
WHITELISTED_TERMINATED_REASONS = ['oomkilled', 'containercannotrun', 'error']

kube_labels_mapper = {
'namespace': 'kube_namespace',
'job': 'kube_job',
'cronjob': 'kube_cronjob',
'pod': 'pod_name',
'phase': 'pod_phase',
'daemonset': 'kube_daemon_set',
'replicationcontroller': 'kube_replication_controller',
'replicaset': 'kube_replica_set',
'statefulset ': 'kube_stateful_set',
'deployment': 'kube_deployment',
'container': 'kube_container_name',
'container_id': 'container_id',
'image': 'image_name',
}

class KubernetesState(OpenMetricsBaseCheck):
"""
Expand Down Expand Up @@ -58,6 +74,11 @@ def __init__(self, name, init_config, agentConfig, instances=None):
instance = instances[0]
kubernetes_state_instance = self._create_kubernetes_state_prometheus_instance(instance)

# First deprecation phase: we keep ksm labels by default
# Next iteration: remove ksm labels by default
# Last iteration: remove this option
self.keep_ksm_labels = kubernetes_state_instance.get('keep_ksm_labels', True)

generic_instances = [kubernetes_state_instance]
super(KubernetesState, self).__init__(name, init_config, agentConfig, instances=generic_instances)

Expand Down Expand Up @@ -424,6 +445,18 @@ def _label_to_tag(self, name, labels, scraper_config, tag_name=None):
else:
return None

def _label_to_tags(self, name, labels, scraper_config, tag_name=None):
"""
Search for `name` in labels name and returns corresponding tags string.
Tag name is label name if not specified.
Returns None if name was not found.
"""
value = labels.get(name)
_tags = list()
if value:
_tags += self._build_tags(tag_name or name, value, scraper_config)
return _tags

def _trim_job_tag(self, name):
"""
Trims suffix of job names if they match -(\\d{4,10}$)
Expand Down Expand Up @@ -456,10 +489,9 @@ def kube_pod_status_phase(self, metric, scraper_config):
for sample in metric.samples:
# Counts aggregated cluster-wide to avoid no-data issues on pod churn,
# pod granularity available in the service checks
tags = [
self._label_to_tag('namespace', sample[self.SAMPLE_LABELS], scraper_config),
self._label_to_tag('phase', sample[self.SAMPLE_LABELS], scraper_config),
] + scraper_config['custom_tags']
tags = self._label_to_tags('namespace', sample[self.SAMPLE_LABELS], scraper_config) \
+ self._label_to_tags('phase', sample[self.SAMPLE_LABELS], scraper_config) \
+ scraper_config['custom_tags']
status_phase_counter[tuple(sorted(tags))] += sample[self.SAMPLE_VALUE]

for tags, count in iteritems(status_phase_counter):
Expand All @@ -477,20 +509,18 @@ def _submit_metric_kube_pod_container_status_reason(
if reason:
# Filtering according to the reason here is paramount to limit cardinality
if reason.lower() in whitelisted_status_reasons:
tags.append(self._format_tag('reason', reason, scraper_config))
tags += self._build_tags('reason', reason, scraper_config)
else:
continue

if 'container' in sample[self.SAMPLE_LABELS]:
tags.append(
self._format_tag('kube_container_name', sample[self.SAMPLE_LABELS]['container'], scraper_config)
)
tags += self._build_tags('kube_container_name', sample[self.SAMPLE_LABELS]['container'], scraper_config)

if 'namespace' in sample[self.SAMPLE_LABELS]:
tags.append(self._format_tag('namespace', sample[self.SAMPLE_LABELS]['namespace'], scraper_config))
tags += self._build_tags('namespace', sample[self.SAMPLE_LABELS]['namespace'], scraper_config)

if 'pod' in sample[self.SAMPLE_LABELS]:
tags.append(self._format_tag('pod', sample[self.SAMPLE_LABELS]['pod'], scraper_config))
tags += self._build_tags('pod', sample[self.SAMPLE_LABELS]['pod'], scraper_config)

self.gauge(
metric_name,
Expand All @@ -516,10 +546,10 @@ def kube_cronjob_next_schedule_time(self, metric, scraper_config):
curr_time = int(time.time())
for sample in metric.samples:
on_schedule = int(sample[self.SAMPLE_VALUE]) - curr_time
tags = [
self._format_tag(label_name, label_value, scraper_config)
for label_name, label_value in iteritems(sample[self.SAMPLE_LABELS])
]
tags = list()
for label_name, label_value in iteritems(sample[self.SAMPLE_LABELS]):
tags += self._build_tags(label_name, label_value, scraper_config)

tags += scraper_config['custom_tags']
if on_schedule < 0:
message = "The service check scheduled at {} is {} seconds late".format(
Expand All @@ -536,9 +566,9 @@ def kube_job_complete(self, metric, scraper_config):
for label_name, label_value in iteritems(sample[self.SAMPLE_LABELS]):
if label_name == 'job' or label_name == 'job_name':
trimmed_job = self._trim_job_tag(label_value)
tags.append(self._format_tag(label_name, trimmed_job, scraper_config))
tags += self._build_tags(label_name, trimmed_job, scraper_config)
else:
tags.append(self._format_tag(label_name, label_value, scraper_config))
tags += self._build_tags(label_name, label_value, scraper_config)
self.service_check(service_check_name, self.OK, tags=tags + scraper_config['custom_tags'])

def kube_job_failed(self, metric, scraper_config):
Expand All @@ -548,9 +578,9 @@ def kube_job_failed(self, metric, scraper_config):
for label_name, label_value in iteritems(sample[self.SAMPLE_LABELS]):
if label_name == 'job' or label_name == 'job_name':
trimmed_job = self._trim_job_tag(label_value)
tags.append(self._format_tag(label_name, trimmed_job, scraper_config))
tags += self._build_tags(label_name, trimmed_job, scraper_config)
else:
tags.append(self._format_tag(label_name, label_value, scraper_config))
tags += self._build_tags(label_name, label_value, scraper_config)
self.service_check(service_check_name, self.CRITICAL, tags=tags + scraper_config['custom_tags'])

def kube_job_status_failed(self, metric, scraper_config):
Expand All @@ -561,9 +591,9 @@ def kube_job_status_failed(self, metric, scraper_config):
if label_name == 'job' or label_name == 'job_name':
trimmed_job = self._trim_job_tag(label_value)
job_ts = self._extract_job_timestamp(label_value)
tags.append(self._format_tag(label_name, trimmed_job, scraper_config))
tags += self._build_tags(label_name, trimmed_job, scraper_config)
else:
tags.append(self._format_tag(label_name, label_value, scraper_config))
tags += self._build_tags(label_name, label_value, scraper_config)
self.failed_job_counts[frozenset(tags)].update_current_ts_and_add_count(job_ts, sample[self.SAMPLE_VALUE])

def kube_job_status_succeeded(self, metric, scraper_config):
Expand All @@ -574,9 +604,9 @@ def kube_job_status_succeeded(self, metric, scraper_config):
if label_name == 'job' or label_name == 'job_name':
trimmed_job = self._trim_job_tag(label_value)
job_ts = self._extract_job_timestamp(label_value)
tags.append(self._format_tag(label_name, trimmed_job, scraper_config))
tags += self._build_tags(label_name, trimmed_job, scraper_config)
else:
tags.append(self._format_tag(label_name, label_value, scraper_config))
tags += self._build_tags(label_name, label_value, scraper_config)
self.succeeded_job_counts[frozenset(tags)].update_current_ts_and_add_count(
job_ts, sample[self.SAMPLE_VALUE]
)
Expand All @@ -588,21 +618,20 @@ def kube_node_status_condition(self, metric, scraper_config):
by_condition_counter = Counter()

for sample in metric.samples:
node_tag = self._label_to_tag("node", sample[self.SAMPLE_LABELS], scraper_config)
node_tags = self._label_to_tags("node", sample[self.SAMPLE_LABELS], scraper_config)
self._condition_to_tag_check(
sample,
base_check_name,
self.condition_to_status_positive,
scraper_config,
tags=[node_tag] + scraper_config['custom_tags'],
tags=node_tags + scraper_config['custom_tags'],
)

# Counts aggregated cluster-wide to avoid no-data issues on node churn,
# node granularity available in the service checks
tags = [
self._label_to_tag("condition", sample[self.SAMPLE_LABELS], scraper_config),
self._label_to_tag("status", sample[self.SAMPLE_LABELS], scraper_config),
] + scraper_config['custom_tags']
tags = self._label_to_tags("condition", sample[self.SAMPLE_LABELS], scraper_config) \
+ self._label_to_tags("status", sample[self.SAMPLE_LABELS], scraper_config) \
+ scraper_config['custom_tags']
by_condition_counter[tuple(sorted(tags))] += sample[self.SAMPLE_VALUE]

for tags, count in iteritems(by_condition_counter):
Expand All @@ -612,60 +641,60 @@ def kube_node_status_ready(self, metric, scraper_config):
""" The ready status of a cluster node (legacy)"""
service_check_name = scraper_config['namespace'] + '.node.ready'
for sample in metric.samples:
node_tag = self._label_to_tag("node", sample[self.SAMPLE_LABELS], scraper_config)
node_tags = self._label_to_tags("node", sample[self.SAMPLE_LABELS], scraper_config)
self._condition_to_service_check(
sample,
service_check_name,
self.condition_to_status_positive,
tags=[node_tag] + scraper_config['custom_tags'],
tags= node_tags + scraper_config['custom_tags'],
)

def kube_node_status_out_of_disk(self, metric, scraper_config):
""" Whether the node is out of disk space (legacy)"""
service_check_name = scraper_config['namespace'] + '.node.out_of_disk'
for sample in metric.samples:
node_tag = self._label_to_tag("node", sample[self.SAMPLE_LABELS], scraper_config)
node_tags = self._label_to_tags("node", sample[self.SAMPLE_LABELS], scraper_config)
self._condition_to_service_check(
sample,
service_check_name,
self.condition_to_status_negative,
tags=[node_tag] + scraper_config['custom_tags'],
tags= node_tags + scraper_config['custom_tags'],
)

def kube_node_status_memory_pressure(self, metric, scraper_config):
""" Whether the node is in a memory pressure state (legacy)"""
service_check_name = scraper_config['namespace'] + '.node.memory_pressure'
for sample in metric.samples:
node_tag = self._label_to_tag("node", sample[self.SAMPLE_LABELS], scraper_config)
node_tags = self._label_to_tags("node", sample[self.SAMPLE_LABELS], scraper_config)
self._condition_to_service_check(
sample,
service_check_name,
self.condition_to_status_negative,
tags=[node_tag] + scraper_config['custom_tags'],
tags= node_tags + scraper_config['custom_tags'],
)

def kube_node_status_disk_pressure(self, metric, scraper_config):
""" Whether the node is in a disk pressure state (legacy)"""
service_check_name = scraper_config['namespace'] + '.node.disk_pressure'
for sample in metric.samples:
node_tag = self._label_to_tag("node", sample[self.SAMPLE_LABELS], scraper_config)
node_tags = self._label_to_tags("node", sample[self.SAMPLE_LABELS], scraper_config)
self._condition_to_service_check(
sample,
service_check_name,
self.condition_to_status_negative,
tags=[node_tag] + scraper_config['custom_tags'],
tags= node_tags + scraper_config['custom_tags'],
)

def kube_node_status_network_unavailable(self, metric, scraper_config):
""" Whether the node is in a network unavailable state (legacy)"""
service_check_name = scraper_config['namespace'] + '.node.network_unavailable'
for sample in metric.samples:
node_tag = self._label_to_tag("node", sample[self.SAMPLE_LABELS], scraper_config)
node_tags = self._label_to_tags("node", sample[self.SAMPLE_LABELS], scraper_config)
self._condition_to_service_check(
sample,
service_check_name,
self.condition_to_status_negative,
tags=[node_tag] + scraper_config['custom_tags'],
tags= node_tags + scraper_config['custom_tags'],
)

def kube_node_spec_unschedulable(self, metric, scraper_config):
Expand All @@ -674,13 +703,12 @@ def kube_node_spec_unschedulable(self, metric, scraper_config):
statuses = ('schedulable', 'unschedulable')
if metric.type in METRIC_TYPES:
for sample in metric.samples:
tags = [
self._format_tag(label_name, label_value, scraper_config)
for label_name, label_value in iteritems(sample[self.SAMPLE_LABELS])
]
tags = list()
for label_name, label_value in iteritems(sample[self.SAMPLE_LABELS]):
tags += self._build_tags(label_name, label_value, scraper_config)
tags += scraper_config['custom_tags']
status = statuses[int(sample[self.SAMPLE_VALUE])] # value can be 0 or 1
tags.append(self._format_tag('status', status, scraper_config))
tags += self._build_tags('status', status, scraper_config)
self.gauge(metric_name, 1, tags) # metric value is always one, value is on the tags
else:
self.log.error("Metric type %s unsupported for metric %s" % (metric.type, metric.name))
Expand All @@ -693,10 +721,9 @@ def kube_resourcequota(self, metric, scraper_config):
for sample in metric.samples:
mtype = sample[self.SAMPLE_LABELS].get("type")
resource = sample[self.SAMPLE_LABELS].get("resource")
tags = [
self._label_to_tag("namespace", sample[self.SAMPLE_LABELS], scraper_config),
self._label_to_tag("resourcequota", sample[self.SAMPLE_LABELS], scraper_config),
] + scraper_config['custom_tags']
tags = self._label_to_tags("namespace", sample[self.SAMPLE_LABELS], scraper_config) \
+ self._label_to_tags("resourcequota", sample[self.SAMPLE_LABELS], scraper_config) \
+ scraper_config['custom_tags']
self.gauge(metric_base_name.format(resource, suffixes[mtype]), sample[self.SAMPLE_VALUE], tags)
else:
self.log.error("Metric type %s unsupported for metric %s" % (metric.type, metric.name))
Expand Down Expand Up @@ -724,11 +751,11 @@ def kube_limitrange(self, metric, scraper_config):
self.error("Constraint %s unsupported for metric %s" % (constraint, metric.name))
continue
resource = sample[self.SAMPLE_LABELS].get("resource")
tags = [
self._label_to_tag("namespace", sample[self.SAMPLE_LABELS], scraper_config),
self._label_to_tag("limitrange", sample[self.SAMPLE_LABELS], scraper_config),
self._label_to_tag("type", sample[self.SAMPLE_LABELS], scraper_config, tag_name="consumer_type"),
] + scraper_config['custom_tags']
tags = self._label_to_tags("namespace", sample[self.SAMPLE_LABELS], scraper_config) \
+ self._label_to_tags("limitrange", sample[self.SAMPLE_LABELS], scraper_config) \
+ self._label_to_tags("limitrange", sample[self.SAMPLE_LABELS], scraper_config) \
+ self._label_to_tags("type", sample[self.SAMPLE_LABELS], scraper_config, tag_name="consumer_type") \
+ scraper_config['custom_tags']
self.gauge(metric_base_name.format(resource, constraint), sample[self.SAMPLE_VALUE], tags)
else:
self.log.error("Metric type %s unsupported for metric %s" % (metric.type, metric.name))
Expand All @@ -747,3 +774,33 @@ def count_objects_by_tags(self, metric, scraper_config):

for tags, count in iteritems(object_counter):
self.gauge(metric_name, count, tags=list(tags))


def _build_tags(self, label_name, label_value, scraper_config, hostname=None):
"""
Build a list of formated tags from `label_name` parameter. It also depend of the
check configuration ('keep_ksm_labels' parameter)
"""
_tags = list()
# first use the labels_mapper
tag_name = scraper_config['labels_mapper'].get(label_name, label_name)
# then try to use the kube_labels_mapper
kube_tag_name = kube_labels_mapper.get(tag_name, tag_name)
_tags.append('{}:{}'.format(to_string(kube_tag_name), to_string(label_value)))
if self.keep_ksm_labels and (kube_tag_name != tag_name):
_tags.append('{}:{}'.format(to_string(tag_name), to_string(label_value)))
return _tags

def _metric_tags(self, metric_name, val, sample, scraper_config, hostname=None):
"""
Redefine this method to allow labels duplication, during migration phase
"""
custom_tags = scraper_config['custom_tags']
_tags = list(custom_tags)
_tags.extend(scraper_config['_metric_tags'])
for label_name, label_value in iteritems(sample[self.SAMPLE_LABELS]):
if label_name not in scraper_config['exclude_labels']:
_tags += self._build_tags(label_name, label_value, scraper_config)
return self._finalize_tags_to_submit(
_tags, metric_name, val, sample, custom_tags=custom_tags, hostname=hostname
)
Loading

0 comments on commit 4e3f724

Please sign in to comment.