From 4e3f724f344b3748a04b7957b1bc25f0d57bf85f Mon Sep 17 00:00:00 2001 From: cedric lamoriniere Date: Thu, 28 Nov 2019 11:02:28 +0100 Subject: [PATCH] [kubernetes_state] improves tagging compliancy The `kubernetes_state` and `kubernetes` checks were using a different set of tag names to convey the same information. This change aims to introduce the `kubernetes` check tag set in the `kubernetes_state` check. To ease the user migration to the new tag names. This commit also introduces a new parameter `keep_ksm_labels` to have both tag sets on `kubernetes_state` metrics. Currently, `keep_ksm_labels` is set to `True` by default. --- .../kubernetes_state/kubernetes_state.py | 161 ++++++++++++------ .../tests/test_kubernetes_state.py | 37 ++-- 2 files changed, 133 insertions(+), 65 deletions(-) diff --git a/kubernetes_state/datadog_checks/kubernetes_state/kubernetes_state.py b/kubernetes_state/datadog_checks/kubernetes_state/kubernetes_state.py index c34a0109c50d13..8bb32a1c8ee240 100644 --- a/kubernetes_state/datadog_checks/kubernetes_state/kubernetes_state.py +++ b/kubernetes_state/datadog_checks/kubernetes_state/kubernetes_state.py @@ -12,6 +12,7 @@ from datadog_checks.checks.openmetrics import OpenMetricsBaseCheck from datadog_checks.config import is_affirmative from datadog_checks.errors import CheckException +from datadog_checks.utils.common import to_string try: # this module is only available in agent 6 @@ -28,6 +29,21 @@ def get_clustername(): WHITELISTED_WAITING_REASONS = ['errimagepull', 'imagepullbackoff', 'crashloopbackoff', 'containercreating'] WHITELISTED_TERMINATED_REASONS = ['oomkilled', 'containercannotrun', 'error'] +kube_labels_mapper = { + 'namespace': 'kube_namespace', + 'job': 'kube_job', + 'cronjob': 'kube_cronjob', + 'pod': 'pod_name', + 'phase': 'pod_phase', + 'daemonset': 'kube_daemon_set', + 'replicationcontroller': 'kube_replication_controller', + 'replicaset': 'kube_replica_set', + 'statefulset ': 'kube_stateful_set', + 'deployment': 'kube_deployment', + 'container': 'kube_container_name', + 'container_id': 'container_id', + 'image': 'image_name', +} class KubernetesState(OpenMetricsBaseCheck): """ @@ -58,6 +74,11 @@ def __init__(self, name, init_config, agentConfig, instances=None): instance = instances[0] kubernetes_state_instance = self._create_kubernetes_state_prometheus_instance(instance) + # First deprecation phase: we keep ksm labels by default + # Next iteration: remove ksm labels by default + # Last iteration: remove this option + self.keep_ksm_labels = kubernetes_state_instance.get('keep_ksm_labels', True) + generic_instances = [kubernetes_state_instance] super(KubernetesState, self).__init__(name, init_config, agentConfig, instances=generic_instances) @@ -424,6 +445,18 @@ def _label_to_tag(self, name, labels, scraper_config, tag_name=None): else: return None + def _label_to_tags(self, name, labels, scraper_config, tag_name=None): + """ + Search for `name` in labels name and returns corresponding tags string. + Tag name is label name if not specified. + Returns None if name was not found. + """ + value = labels.get(name) + _tags = list() + if value: + _tags += self._build_tags(tag_name or name, value, scraper_config) + return _tags + def _trim_job_tag(self, name): """ Trims suffix of job names if they match -(\\d{4,10}$) @@ -456,10 +489,9 @@ def kube_pod_status_phase(self, metric, scraper_config): for sample in metric.samples: # Counts aggregated cluster-wide to avoid no-data issues on pod churn, # pod granularity available in the service checks - tags = [ - self._label_to_tag('namespace', sample[self.SAMPLE_LABELS], scraper_config), - self._label_to_tag('phase', sample[self.SAMPLE_LABELS], scraper_config), - ] + scraper_config['custom_tags'] + tags = self._label_to_tags('namespace', sample[self.SAMPLE_LABELS], scraper_config) \ + + self._label_to_tags('phase', sample[self.SAMPLE_LABELS], scraper_config) \ + + scraper_config['custom_tags'] status_phase_counter[tuple(sorted(tags))] += sample[self.SAMPLE_VALUE] for tags, count in iteritems(status_phase_counter): @@ -477,20 +509,18 @@ def _submit_metric_kube_pod_container_status_reason( if reason: # Filtering according to the reason here is paramount to limit cardinality if reason.lower() in whitelisted_status_reasons: - tags.append(self._format_tag('reason', reason, scraper_config)) + tags += self._build_tags('reason', reason, scraper_config) else: continue if 'container' in sample[self.SAMPLE_LABELS]: - tags.append( - self._format_tag('kube_container_name', sample[self.SAMPLE_LABELS]['container'], scraper_config) - ) + tags += self._build_tags('kube_container_name', sample[self.SAMPLE_LABELS]['container'], scraper_config) if 'namespace' in sample[self.SAMPLE_LABELS]: - tags.append(self._format_tag('namespace', sample[self.SAMPLE_LABELS]['namespace'], scraper_config)) + tags += self._build_tags('namespace', sample[self.SAMPLE_LABELS]['namespace'], scraper_config) if 'pod' in sample[self.SAMPLE_LABELS]: - tags.append(self._format_tag('pod', sample[self.SAMPLE_LABELS]['pod'], scraper_config)) + tags += self._build_tags('pod', sample[self.SAMPLE_LABELS]['pod'], scraper_config) self.gauge( metric_name, @@ -516,10 +546,10 @@ def kube_cronjob_next_schedule_time(self, metric, scraper_config): curr_time = int(time.time()) for sample in metric.samples: on_schedule = int(sample[self.SAMPLE_VALUE]) - curr_time - tags = [ - self._format_tag(label_name, label_value, scraper_config) - for label_name, label_value in iteritems(sample[self.SAMPLE_LABELS]) - ] + tags = list() + for label_name, label_value in iteritems(sample[self.SAMPLE_LABELS]): + tags += self._build_tags(label_name, label_value, scraper_config) + tags += scraper_config['custom_tags'] if on_schedule < 0: message = "The service check scheduled at {} is {} seconds late".format( @@ -536,9 +566,9 @@ def kube_job_complete(self, metric, scraper_config): for label_name, label_value in iteritems(sample[self.SAMPLE_LABELS]): if label_name == 'job' or label_name == 'job_name': trimmed_job = self._trim_job_tag(label_value) - tags.append(self._format_tag(label_name, trimmed_job, scraper_config)) + tags += self._build_tags(label_name, trimmed_job, scraper_config) else: - tags.append(self._format_tag(label_name, label_value, scraper_config)) + tags += self._build_tags(label_name, label_value, scraper_config) self.service_check(service_check_name, self.OK, tags=tags + scraper_config['custom_tags']) def kube_job_failed(self, metric, scraper_config): @@ -548,9 +578,9 @@ def kube_job_failed(self, metric, scraper_config): for label_name, label_value in iteritems(sample[self.SAMPLE_LABELS]): if label_name == 'job' or label_name == 'job_name': trimmed_job = self._trim_job_tag(label_value) - tags.append(self._format_tag(label_name, trimmed_job, scraper_config)) + tags += self._build_tags(label_name, trimmed_job, scraper_config) else: - tags.append(self._format_tag(label_name, label_value, scraper_config)) + tags += self._build_tags(label_name, label_value, scraper_config) self.service_check(service_check_name, self.CRITICAL, tags=tags + scraper_config['custom_tags']) def kube_job_status_failed(self, metric, scraper_config): @@ -561,9 +591,9 @@ def kube_job_status_failed(self, metric, scraper_config): if label_name == 'job' or label_name == 'job_name': trimmed_job = self._trim_job_tag(label_value) job_ts = self._extract_job_timestamp(label_value) - tags.append(self._format_tag(label_name, trimmed_job, scraper_config)) + tags += self._build_tags(label_name, trimmed_job, scraper_config) else: - tags.append(self._format_tag(label_name, label_value, scraper_config)) + tags += self._build_tags(label_name, label_value, scraper_config) self.failed_job_counts[frozenset(tags)].update_current_ts_and_add_count(job_ts, sample[self.SAMPLE_VALUE]) def kube_job_status_succeeded(self, metric, scraper_config): @@ -574,9 +604,9 @@ def kube_job_status_succeeded(self, metric, scraper_config): if label_name == 'job' or label_name == 'job_name': trimmed_job = self._trim_job_tag(label_value) job_ts = self._extract_job_timestamp(label_value) - tags.append(self._format_tag(label_name, trimmed_job, scraper_config)) + tags += self._build_tags(label_name, trimmed_job, scraper_config) else: - tags.append(self._format_tag(label_name, label_value, scraper_config)) + tags += self._build_tags(label_name, label_value, scraper_config) self.succeeded_job_counts[frozenset(tags)].update_current_ts_and_add_count( job_ts, sample[self.SAMPLE_VALUE] ) @@ -588,21 +618,20 @@ def kube_node_status_condition(self, metric, scraper_config): by_condition_counter = Counter() for sample in metric.samples: - node_tag = self._label_to_tag("node", sample[self.SAMPLE_LABELS], scraper_config) + node_tags = self._label_to_tags("node", sample[self.SAMPLE_LABELS], scraper_config) self._condition_to_tag_check( sample, base_check_name, self.condition_to_status_positive, scraper_config, - tags=[node_tag] + scraper_config['custom_tags'], + tags=node_tags + scraper_config['custom_tags'], ) # Counts aggregated cluster-wide to avoid no-data issues on node churn, # node granularity available in the service checks - tags = [ - self._label_to_tag("condition", sample[self.SAMPLE_LABELS], scraper_config), - self._label_to_tag("status", sample[self.SAMPLE_LABELS], scraper_config), - ] + scraper_config['custom_tags'] + tags = self._label_to_tags("condition", sample[self.SAMPLE_LABELS], scraper_config) \ + + self._label_to_tags("status", sample[self.SAMPLE_LABELS], scraper_config) \ + + scraper_config['custom_tags'] by_condition_counter[tuple(sorted(tags))] += sample[self.SAMPLE_VALUE] for tags, count in iteritems(by_condition_counter): @@ -612,60 +641,60 @@ def kube_node_status_ready(self, metric, scraper_config): """ The ready status of a cluster node (legacy)""" service_check_name = scraper_config['namespace'] + '.node.ready' for sample in metric.samples: - node_tag = self._label_to_tag("node", sample[self.SAMPLE_LABELS], scraper_config) + node_tags = self._label_to_tags("node", sample[self.SAMPLE_LABELS], scraper_config) self._condition_to_service_check( sample, service_check_name, self.condition_to_status_positive, - tags=[node_tag] + scraper_config['custom_tags'], + tags= node_tags + scraper_config['custom_tags'], ) def kube_node_status_out_of_disk(self, metric, scraper_config): """ Whether the node is out of disk space (legacy)""" service_check_name = scraper_config['namespace'] + '.node.out_of_disk' for sample in metric.samples: - node_tag = self._label_to_tag("node", sample[self.SAMPLE_LABELS], scraper_config) + node_tags = self._label_to_tags("node", sample[self.SAMPLE_LABELS], scraper_config) self._condition_to_service_check( sample, service_check_name, self.condition_to_status_negative, - tags=[node_tag] + scraper_config['custom_tags'], + tags= node_tags + scraper_config['custom_tags'], ) def kube_node_status_memory_pressure(self, metric, scraper_config): """ Whether the node is in a memory pressure state (legacy)""" service_check_name = scraper_config['namespace'] + '.node.memory_pressure' for sample in metric.samples: - node_tag = self._label_to_tag("node", sample[self.SAMPLE_LABELS], scraper_config) + node_tags = self._label_to_tags("node", sample[self.SAMPLE_LABELS], scraper_config) self._condition_to_service_check( sample, service_check_name, self.condition_to_status_negative, - tags=[node_tag] + scraper_config['custom_tags'], + tags= node_tags + scraper_config['custom_tags'], ) def kube_node_status_disk_pressure(self, metric, scraper_config): """ Whether the node is in a disk pressure state (legacy)""" service_check_name = scraper_config['namespace'] + '.node.disk_pressure' for sample in metric.samples: - node_tag = self._label_to_tag("node", sample[self.SAMPLE_LABELS], scraper_config) + node_tags = self._label_to_tags("node", sample[self.SAMPLE_LABELS], scraper_config) self._condition_to_service_check( sample, service_check_name, self.condition_to_status_negative, - tags=[node_tag] + scraper_config['custom_tags'], + tags= node_tags + scraper_config['custom_tags'], ) def kube_node_status_network_unavailable(self, metric, scraper_config): """ Whether the node is in a network unavailable state (legacy)""" service_check_name = scraper_config['namespace'] + '.node.network_unavailable' for sample in metric.samples: - node_tag = self._label_to_tag("node", sample[self.SAMPLE_LABELS], scraper_config) + node_tags = self._label_to_tags("node", sample[self.SAMPLE_LABELS], scraper_config) self._condition_to_service_check( sample, service_check_name, self.condition_to_status_negative, - tags=[node_tag] + scraper_config['custom_tags'], + tags= node_tags + scraper_config['custom_tags'], ) def kube_node_spec_unschedulable(self, metric, scraper_config): @@ -674,13 +703,12 @@ def kube_node_spec_unschedulable(self, metric, scraper_config): statuses = ('schedulable', 'unschedulable') if metric.type in METRIC_TYPES: for sample in metric.samples: - tags = [ - self._format_tag(label_name, label_value, scraper_config) - for label_name, label_value in iteritems(sample[self.SAMPLE_LABELS]) - ] + tags = list() + for label_name, label_value in iteritems(sample[self.SAMPLE_LABELS]): + tags += self._build_tags(label_name, label_value, scraper_config) tags += scraper_config['custom_tags'] status = statuses[int(sample[self.SAMPLE_VALUE])] # value can be 0 or 1 - tags.append(self._format_tag('status', status, scraper_config)) + tags += self._build_tags('status', status, scraper_config) self.gauge(metric_name, 1, tags) # metric value is always one, value is on the tags else: self.log.error("Metric type %s unsupported for metric %s" % (metric.type, metric.name)) @@ -693,10 +721,9 @@ def kube_resourcequota(self, metric, scraper_config): for sample in metric.samples: mtype = sample[self.SAMPLE_LABELS].get("type") resource = sample[self.SAMPLE_LABELS].get("resource") - tags = [ - self._label_to_tag("namespace", sample[self.SAMPLE_LABELS], scraper_config), - self._label_to_tag("resourcequota", sample[self.SAMPLE_LABELS], scraper_config), - ] + scraper_config['custom_tags'] + tags = self._label_to_tags("namespace", sample[self.SAMPLE_LABELS], scraper_config) \ + + self._label_to_tags("resourcequota", sample[self.SAMPLE_LABELS], scraper_config) \ + + scraper_config['custom_tags'] self.gauge(metric_base_name.format(resource, suffixes[mtype]), sample[self.SAMPLE_VALUE], tags) else: self.log.error("Metric type %s unsupported for metric %s" % (metric.type, metric.name)) @@ -724,11 +751,11 @@ def kube_limitrange(self, metric, scraper_config): self.error("Constraint %s unsupported for metric %s" % (constraint, metric.name)) continue resource = sample[self.SAMPLE_LABELS].get("resource") - tags = [ - self._label_to_tag("namespace", sample[self.SAMPLE_LABELS], scraper_config), - self._label_to_tag("limitrange", sample[self.SAMPLE_LABELS], scraper_config), - self._label_to_tag("type", sample[self.SAMPLE_LABELS], scraper_config, tag_name="consumer_type"), - ] + scraper_config['custom_tags'] + tags = self._label_to_tags("namespace", sample[self.SAMPLE_LABELS], scraper_config) \ + + self._label_to_tags("limitrange", sample[self.SAMPLE_LABELS], scraper_config) \ + + self._label_to_tags("limitrange", sample[self.SAMPLE_LABELS], scraper_config) \ + + self._label_to_tags("type", sample[self.SAMPLE_LABELS], scraper_config, tag_name="consumer_type") \ + + scraper_config['custom_tags'] self.gauge(metric_base_name.format(resource, constraint), sample[self.SAMPLE_VALUE], tags) else: self.log.error("Metric type %s unsupported for metric %s" % (metric.type, metric.name)) @@ -747,3 +774,33 @@ def count_objects_by_tags(self, metric, scraper_config): for tags, count in iteritems(object_counter): self.gauge(metric_name, count, tags=list(tags)) + + + def _build_tags(self, label_name, label_value, scraper_config, hostname=None): + """ + Build a list of formated tags from `label_name` parameter. It also depend of the + check configuration ('keep_ksm_labels' parameter) + """ + _tags = list() + # first use the labels_mapper + tag_name = scraper_config['labels_mapper'].get(label_name, label_name) + # then try to use the kube_labels_mapper + kube_tag_name = kube_labels_mapper.get(tag_name, tag_name) + _tags.append('{}:{}'.format(to_string(kube_tag_name), to_string(label_value))) + if self.keep_ksm_labels and (kube_tag_name != tag_name): + _tags.append('{}:{}'.format(to_string(tag_name), to_string(label_value))) + return _tags + + def _metric_tags(self, metric_name, val, sample, scraper_config, hostname=None): + """ + Redefine this method to allow labels duplication, during migration phase + """ + custom_tags = scraper_config['custom_tags'] + _tags = list(custom_tags) + _tags.extend(scraper_config['_metric_tags']) + for label_name, label_value in iteritems(sample[self.SAMPLE_LABELS]): + if label_name not in scraper_config['exclude_labels']: + _tags += self._build_tags(label_name, label_value, scraper_config) + return self._finalize_tags_to_submit( + _tags, metric_name, val, sample, custom_tags=custom_tags, hostname=hostname + ) diff --git a/kubernetes_state/tests/test_kubernetes_state.py b/kubernetes_state/tests/test_kubernetes_state.py index 95a70ee9f74056..264c3b7fe49ace 100644 --- a/kubernetes_state/tests/test_kubernetes_state.py +++ b/kubernetes_state/tests/test_kubernetes_state.py @@ -279,19 +279,19 @@ def test_update_kube_state_metrics(aggregator, instance, check): # Make sure we send counts for all phases to avoid no-data graphing issues aggregator.assert_metric( - NAMESPACE + '.pod.status_phase', tags=['namespace:default', 'phase:Pending', 'optional:tag1'], value=1 + NAMESPACE + '.pod.status_phase', tags=['kube_namespace:default', 'namespace:default', 'phase:Pending', 'pod_phase:Pending', 'optional:tag1'], value=1 ) aggregator.assert_metric( - NAMESPACE + '.pod.status_phase', tags=['namespace:default', 'phase:Running', 'optional:tag1'], value=3 + NAMESPACE + '.pod.status_phase', tags=['kube_namespace:default', 'namespace:default', 'phase:Running', 'pod_phase:Running', 'optional:tag1'], value=3 ) aggregator.assert_metric( - NAMESPACE + '.pod.status_phase', tags=['namespace:default', 'phase:Succeeded', 'optional:tag1'], value=2 + NAMESPACE + '.pod.status_phase', tags=['kube_namespace:default', 'namespace:default', 'phase:Succeeded', 'pod_phase:Succeeded', 'optional:tag1'], value=2 ) aggregator.assert_metric( - NAMESPACE + '.pod.status_phase', tags=['namespace:default', 'phase:Failed', 'optional:tag1'], value=2 + NAMESPACE + '.pod.status_phase', tags=['kube_namespace:default', 'namespace:default', 'phase:Failed', 'pod_phase:Failed', 'optional:tag1'], value=2 ) aggregator.assert_metric( - NAMESPACE + '.pod.status_phase', tags=['namespace:default', 'phase:Unknown', 'optional:tag1'], value=1 + NAMESPACE + '.pod.status_phase', tags=['kube_namespace:default', 'namespace:default', 'phase:Unknown', 'pod_phase:Unknown', 'optional:tag1'], value=1 ) # Persistentvolume counts @@ -392,10 +392,10 @@ def test_pod_phase_gauges(aggregator, instance, check): for _ in range(2): check.check(instance) aggregator.assert_metric( - NAMESPACE + '.pod.status_phase', tags=['namespace:default', 'phase:Running', 'optional:tag1'], value=3 + NAMESPACE + '.pod.status_phase', tags=['kube_namespace:default', 'namespace:default', 'phase:Running', 'pod_phase:Running', 'optional:tag1'], value=3 ) aggregator.assert_metric( - NAMESPACE + '.pod.status_phase', tags=['namespace:default', 'phase:Failed', 'optional:tag1'], value=2 + NAMESPACE + '.pod.status_phase', tags=['kube_namespace:default', 'namespace:default', 'phase:Failed', 'pod_phase:Failed', 'optional:tag1'], value=2 ) @@ -419,19 +419,19 @@ def test_job_counts(aggregator, instance): for _ in range(2): check.check(instance) aggregator.assert_metric( - NAMESPACE + '.job.failed', tags=['namespace:default', 'job:hello', 'optional:tag1'], value=0 + NAMESPACE + '.job.failed', tags=['namespace:default', 'kube_namespace:default', 'kube_job:hello', 'job:hello', 'optional:tag1'], value=0 ) aggregator.assert_metric( - NAMESPACE + '.job.succeeded', tags=['namespace:default', 'job:hello', 'optional:tag1'], value=3 + NAMESPACE + '.job.succeeded', tags=['namespace:default', 'kube_namespace:default', 'kube_job:hello', 'job:hello', 'optional:tag1'], value=3 ) # Re-run check to make sure we don't count the same jobs check.check(instance) aggregator.assert_metric( - NAMESPACE + '.job.failed', tags=['namespace:default', 'job:hello', 'optional:tag1'], value=0 + NAMESPACE + '.job.failed', tags=['namespace:default', 'kube_namespace:default', 'kube_job:hello', 'job:hello', 'optional:tag1'], value=0 ) aggregator.assert_metric( - NAMESPACE + '.job.succeeded', tags=['namespace:default', 'job:hello', 'optional:tag1'], value=3 + NAMESPACE + '.job.succeeded', tags=['namespace:default', 'kube_namespace:default', 'kube_job:hello', 'job:hello', 'optional:tag1'], value=3 ) # Edit the payload and rerun the check @@ -447,10 +447,10 @@ def test_job_counts(aggregator, instance): check.poll = mock.MagicMock(return_value=MockResponse(payload, 'text/plain')) check.check(instance) aggregator.assert_metric( - NAMESPACE + '.job.failed', tags=['namespace:default', 'job:hello', 'optional:tag1'], value=1 + NAMESPACE + '.job.failed', tags=['namespace:default', 'kube_namespace:default', 'job:hello', 'kube_job:hello', 'optional:tag1'], value=1 ) aggregator.assert_metric( - NAMESPACE + '.job.succeeded', tags=['namespace:default', 'job:hello', 'optional:tag1'], value=4 + NAMESPACE + '.job.succeeded', tags=['namespace:default', 'kube_namespace:default', 'job:hello', 'kube_job:hello', 'optional:tag1'], value=4 ) @@ -481,3 +481,14 @@ def test_telemetry(aggregator, instance): tags=['resource_name:hpa', 'resource_namespace:ns1', 'optional:tag1'], value=8.0, ) + +def test_keep_ksm_labels_desactivated(aggregator, instance): + instance['keep_ksm_labels'] = False + check = KubernetesState(CHECK_NAME, {}, {}, [instance]) + check.poll = mock.MagicMock(return_value=MockResponse(mock_from_file("prometheus.txt"), 'text/plain')) + check.check(instance) + for _ in range(2): + check.check(instance) + aggregator.assert_metric( + NAMESPACE + '.pod.status_phase', tags=['kube_namespace:default', 'pod_phase:Running', 'optional:tag1'], value=3 + )