From b041c6e97d1c5c0592a09c8d28e183b150225682 Mon Sep 17 00:00:00 2001 From: Simon Guerrier Date: Mon, 29 Jul 2019 15:49:10 +0200 Subject: [PATCH 01/18] fix job metrics and added some extra test --- .../kubernetes_state/kubernetes_state.py | 49 ++++++++++++++----- .../tests/test_kubernetes_state.py | 19 +++++++ 2 files changed, 57 insertions(+), 11 deletions(-) diff --git a/kubernetes_state/datadog_checks/kubernetes_state/kubernetes_state.py b/kubernetes_state/datadog_checks/kubernetes_state/kubernetes_state.py index 83c3d6c2a8a9c..ed56aa1187165 100644 --- a/kubernetes_state/datadog_checks/kubernetes_state/kubernetes_state.py +++ b/kubernetes_state/datadog_checks/kubernetes_state/kubernetes_state.py @@ -35,6 +35,11 @@ class KubernetesState(OpenMetricsBaseCheck): See https://github.com/kubernetes/kube-state-metrics """ + class JobCount(): + def __init__(self): + self.count = 0 + self.last_job_ts = 0 + DEFAULT_METRIC_LIMIT = 0 def __init__(self, name, init_config, agentConfig, instances=None): @@ -80,21 +85,20 @@ def __init__(self, name, init_config, agentConfig, instances=None): 'kube_service_spec_type': self.count_objects_by_tags, } + # Handling jobs succeeded/failed counts + self.failed_job_counts = defaultdict(KubernetesState.JobCount) + self.succeeded_job_counts = defaultdict(KubernetesState.JobCount) + def check(self, instance): endpoint = instance.get('kube_state_url') - # Job counters are monotonic: they increase at every run of the job - # We want to send the delta via the `monotonic_count` method - self.job_succeeded_count = defaultdict(int) - self.job_failed_count = defaultdict(int) - scraper_config = self.config_map[endpoint] self.process(scraper_config, metric_transformers=self.METRIC_TRANSFORMERS) - for job_tags, job_count in iteritems(self.job_succeeded_count): - self.monotonic_count(scraper_config['namespace'] + '.job.succeeded', job_count, list(job_tags)) - for job_tags, job_count in iteritems(self.job_failed_count): - self.monotonic_count(scraper_config['namespace'] + '.job.failed', job_count, list(job_tags)) + for job_tags, job_count in iteritems(self.failed_job_counts): + self.monotonic_count(scraper_config['namespace'] + '.job.failed', job_count.count, list(job_tags)) + for job_tags, job_count in iteritems(self.succeeded_job_counts): + self.monotonic_count(scraper_config['namespace'] + '.job.succeeded', job_count.count, list(job_tags)) def _filter_metric(self, metric, scraper_config): if scraper_config['telemetry']: @@ -398,6 +402,19 @@ def _trim_job_tag(self, name): pattern = r"(-\d{4,10}$)" return re.sub(pattern, '', name) + def _extract_job_timestamp(self, name): + """ + Extract timestamp of job names if they match -(\\^.+\\-) - match everything until a `-` + """ + pattern = r"(^.+\-)" + job_ts = 0 + try: + job_ts = int(re.sub(pattern, '', name)) + except ValueError: + msg = 'Cannot extract ts from job name {}'.format(name) + self.log.debug(msg) + return job_ts + # Labels attached: namespace, pod # As a message the phase=Pending|Running|Succeeded|Failed|Unknown # From the phase the check will update its status @@ -510,25 +527,35 @@ def kube_job_failed(self, metric, scraper_config): def kube_job_status_failed(self, metric, scraper_config): for sample in metric.samples: + job_ts = 0 tags = [] + scraper_config['custom_tags'] for label_name, label_value in iteritems(sample[self.SAMPLE_LABELS]): if label_name == 'job' or label_name == 'job_name': trimmed_job = self._trim_job_tag(label_value) + job_ts = self._extract_job_timestamp(label_value) tags.append(self._format_tag(label_name, trimmed_job, scraper_config)) else: tags.append(self._format_tag(label_name, label_value, scraper_config)) - self.job_failed_count[frozenset(tags)] += sample[self.SAMPLE_VALUE] + if job_ts != 0 and job_ts > self.failed_job_counts[frozenset(tags)].last_job_ts: + self.failed_job_counts[frozenset(tags)].count += sample[self.SAMPLE_VALUE] + self.failed_job_counts[frozenset(tags)].last_job_ts = job_ts + job_count=self.failed_job_counts[frozenset(tags)] def kube_job_status_succeeded(self, metric, scraper_config): for sample in metric.samples: + job_ts = 0 tags = [] + scraper_config['custom_tags'] for label_name, label_value in iteritems(sample[self.SAMPLE_LABELS]): if label_name == 'job' or label_name == 'job_name': trimmed_job = self._trim_job_tag(label_value) + job_ts = self._extract_job_timestamp(label_value) tags.append(self._format_tag(label_name, trimmed_job, scraper_config)) else: tags.append(self._format_tag(label_name, label_value, scraper_config)) - self.job_succeeded_count[frozenset(tags)] += sample[self.SAMPLE_VALUE] + if job_ts != 0 and job_ts > self.succeeded_job_counts[frozenset(tags)].last_job_ts: + self.succeeded_job_counts[frozenset(tags)].count += sample[self.SAMPLE_VALUE] + self.succeeded_job_counts[frozenset(tags)].last_job_ts = job_ts + job_count=self.succeeded_job_counts[frozenset(tags)] def kube_node_status_condition(self, metric, scraper_config): """ The ready status of a cluster node. v1.0+""" diff --git a/kubernetes_state/tests/test_kubernetes_state.py b/kubernetes_state/tests/test_kubernetes_state.py index 5e3555941955a..14c014de13565 100644 --- a/kubernetes_state/tests/test_kubernetes_state.py +++ b/kubernetes_state/tests/test_kubernetes_state.py @@ -380,6 +380,25 @@ def test_pod_phase_gauges(aggregator, instance, check): NAMESPACE + '.pod.status_phase', tags=['namespace:default', 'phase:Failed', 'optional:tag1'], value=2 ) +def test_job_counts(aggregator, instance, check): + for _ in range(2): + check.check(instance) + aggregator.assert_metric( + NAMESPACE + '.job.failed', tags=['namespace:default', 'job:hello', 'optional:tag1'], value=0 + ) + aggregator.assert_metric( + NAMESPACE + '.job.succeeded', tags=['namespace:default', 'job:hello', 'optional:tag1'], value=3 + ) + + # Re-run check to make sure we don't count the same jobs + for _ in range(1): + check.check(instance) + aggregator.assert_metric( + NAMESPACE + '.job.failed', tags=['namespace:default', 'job:hello', 'optional:tag1'], value=0 + ) + aggregator.assert_metric( + NAMESPACE + '.job.succeeded', tags=['namespace:default', 'job:hello', 'optional:tag1'], value=3 + ) def test_telemetry(aggregator, instance): instance['telemetry'] = True From 364f80674477f94ca6599921a0cb56e59ea20e56 Mon Sep 17 00:00:00 2001 From: Simon Guerrier Date: Mon, 29 Jul 2019 16:28:38 +0200 Subject: [PATCH 02/18] fix flake8 test --- .../datadog_checks/kubernetes_state/kubernetes_state.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/kubernetes_state/datadog_checks/kubernetes_state/kubernetes_state.py b/kubernetes_state/datadog_checks/kubernetes_state/kubernetes_state.py index ed56aa1187165..37edec9a0fb11 100644 --- a/kubernetes_state/datadog_checks/kubernetes_state/kubernetes_state.py +++ b/kubernetes_state/datadog_checks/kubernetes_state/kubernetes_state.py @@ -539,7 +539,6 @@ def kube_job_status_failed(self, metric, scraper_config): if job_ts != 0 and job_ts > self.failed_job_counts[frozenset(tags)].last_job_ts: self.failed_job_counts[frozenset(tags)].count += sample[self.SAMPLE_VALUE] self.failed_job_counts[frozenset(tags)].last_job_ts = job_ts - job_count=self.failed_job_counts[frozenset(tags)] def kube_job_status_succeeded(self, metric, scraper_config): for sample in metric.samples: @@ -555,7 +554,6 @@ def kube_job_status_succeeded(self, metric, scraper_config): if job_ts != 0 and job_ts > self.succeeded_job_counts[frozenset(tags)].last_job_ts: self.succeeded_job_counts[frozenset(tags)].count += sample[self.SAMPLE_VALUE] self.succeeded_job_counts[frozenset(tags)].last_job_ts = job_ts - job_count=self.succeeded_job_counts[frozenset(tags)] def kube_node_status_condition(self, metric, scraper_config): """ The ready status of a cluster node. v1.0+""" From 3876675788d4f299b5b70b1ece38badc738f03d5 Mon Sep 17 00:00:00 2001 From: Simon Guerrier Date: Mon, 29 Jul 2019 16:29:44 +0200 Subject: [PATCH 03/18] fix flake8 test on test --- kubernetes_state/tests/test_kubernetes_state.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/kubernetes_state/tests/test_kubernetes_state.py b/kubernetes_state/tests/test_kubernetes_state.py index 14c014de13565..9110632c2aa34 100644 --- a/kubernetes_state/tests/test_kubernetes_state.py +++ b/kubernetes_state/tests/test_kubernetes_state.py @@ -380,6 +380,7 @@ def test_pod_phase_gauges(aggregator, instance, check): NAMESPACE + '.pod.status_phase', tags=['namespace:default', 'phase:Failed', 'optional:tag1'], value=2 ) + def test_job_counts(aggregator, instance, check): for _ in range(2): check.check(instance) @@ -400,6 +401,7 @@ def test_job_counts(aggregator, instance, check): NAMESPACE + '.job.succeeded', tags=['namespace:default', 'job:hello', 'optional:tag1'], value=3 ) + def test_telemetry(aggregator, instance): instance['telemetry'] = True From 8cc2bc970e17952ea26259808923bd7c51c44c14 Mon Sep 17 00:00:00 2001 From: Simon Guerrier Date: Mon, 29 Jul 2019 18:19:26 +0200 Subject: [PATCH 04/18] fix black test --- .../datadog_checks/kubernetes_state/kubernetes_state.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/kubernetes_state/datadog_checks/kubernetes_state/kubernetes_state.py b/kubernetes_state/datadog_checks/kubernetes_state/kubernetes_state.py index 37edec9a0fb11..2873e3c29b7e3 100644 --- a/kubernetes_state/datadog_checks/kubernetes_state/kubernetes_state.py +++ b/kubernetes_state/datadog_checks/kubernetes_state/kubernetes_state.py @@ -35,7 +35,7 @@ class KubernetesState(OpenMetricsBaseCheck): See https://github.com/kubernetes/kube-state-metrics """ - class JobCount(): + class JobCount: def __init__(self): self.count = 0 self.last_job_ts = 0 @@ -539,6 +539,7 @@ def kube_job_status_failed(self, metric, scraper_config): if job_ts != 0 and job_ts > self.failed_job_counts[frozenset(tags)].last_job_ts: self.failed_job_counts[frozenset(tags)].count += sample[self.SAMPLE_VALUE] self.failed_job_counts[frozenset(tags)].last_job_ts = job_ts + job_count=self.failed_job_counts[frozenset(tags)] def kube_job_status_succeeded(self, metric, scraper_config): for sample in metric.samples: @@ -554,6 +555,7 @@ def kube_job_status_succeeded(self, metric, scraper_config): if job_ts != 0 and job_ts > self.succeeded_job_counts[frozenset(tags)].last_job_ts: self.succeeded_job_counts[frozenset(tags)].count += sample[self.SAMPLE_VALUE] self.succeeded_job_counts[frozenset(tags)].last_job_ts = job_ts + job_count=self.succeeded_job_counts[frozenset(tags)] def kube_node_status_condition(self, metric, scraper_config): """ The ready status of a cluster node. v1.0+""" From 057546dae48b106226f5ede2243f996c90e725f4 Mon Sep 17 00:00:00 2001 From: Simon Guerrier Date: Mon, 29 Jul 2019 18:38:42 +0200 Subject: [PATCH 05/18] fix flak8 test again --- .../datadog_checks/kubernetes_state/kubernetes_state.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/kubernetes_state/datadog_checks/kubernetes_state/kubernetes_state.py b/kubernetes_state/datadog_checks/kubernetes_state/kubernetes_state.py index 2873e3c29b7e3..73d0f150b10a2 100644 --- a/kubernetes_state/datadog_checks/kubernetes_state/kubernetes_state.py +++ b/kubernetes_state/datadog_checks/kubernetes_state/kubernetes_state.py @@ -539,7 +539,6 @@ def kube_job_status_failed(self, metric, scraper_config): if job_ts != 0 and job_ts > self.failed_job_counts[frozenset(tags)].last_job_ts: self.failed_job_counts[frozenset(tags)].count += sample[self.SAMPLE_VALUE] self.failed_job_counts[frozenset(tags)].last_job_ts = job_ts - job_count=self.failed_job_counts[frozenset(tags)] def kube_job_status_succeeded(self, metric, scraper_config): for sample in metric.samples: @@ -555,7 +554,6 @@ def kube_job_status_succeeded(self, metric, scraper_config): if job_ts != 0 and job_ts > self.succeeded_job_counts[frozenset(tags)].last_job_ts: self.succeeded_job_counts[frozenset(tags)].count += sample[self.SAMPLE_VALUE] self.succeeded_job_counts[frozenset(tags)].last_job_ts = job_ts - job_count=self.succeeded_job_counts[frozenset(tags)] def kube_node_status_condition(self, metric, scraper_config): """ The ready status of a cluster node. v1.0+""" From d2d1640a03cebd72ec7e43c2e3b23819410c9293 Mon Sep 17 00:00:00 2001 From: Simon Guerrier Date: Wed, 7 Aug 2019 17:04:24 +0200 Subject: [PATCH 06/18] changed extract timestamp + added test --- .../kubernetes_state/kubernetes_state.py | 10 ++-------- kubernetes_state/tests/test_kubernetes_state.py | 7 +++++++ 2 files changed, 9 insertions(+), 8 deletions(-) diff --git a/kubernetes_state/datadog_checks/kubernetes_state/kubernetes_state.py b/kubernetes_state/datadog_checks/kubernetes_state/kubernetes_state.py index 73d0f150b10a2..d10bfa1d1ae6a 100644 --- a/kubernetes_state/datadog_checks/kubernetes_state/kubernetes_state.py +++ b/kubernetes_state/datadog_checks/kubernetes_state/kubernetes_state.py @@ -406,14 +406,8 @@ def _extract_job_timestamp(self, name): """ Extract timestamp of job names if they match -(\\^.+\\-) - match everything until a `-` """ - pattern = r"(^.+\-)" - job_ts = 0 - try: - job_ts = int(re.sub(pattern, '', name)) - except ValueError: - msg = 'Cannot extract ts from job name {}'.format(name) - self.log.debug(msg) - return job_ts + ts = name.split('-')[-1] + return int(ts) if ts.isdigit() else 0 # Labels attached: namespace, pod # As a message the phase=Pending|Running|Succeeded|Failed|Unknown diff --git a/kubernetes_state/tests/test_kubernetes_state.py b/kubernetes_state/tests/test_kubernetes_state.py index 9110632c2aa34..93a4808969815 100644 --- a/kubernetes_state/tests/test_kubernetes_state.py +++ b/kubernetes_state/tests/test_kubernetes_state.py @@ -380,6 +380,13 @@ def test_pod_phase_gauges(aggregator, instance, check): NAMESPACE + '.pod.status_phase', tags=['namespace:default', 'phase:Failed', 'optional:tag1'], value=2 ) +def test_extract_timestamp(check): + job_name = "hello2-1509998340" + job_name2 = "hello-2-1509998340" + result = check._extract_job_timestamp(job_name) + assert result == 1509998340 + result = check._extract_job_timestamp(job_name2) + assert result == 1509998340 def test_job_counts(aggregator, instance, check): for _ in range(2): From 179edebaf5e746cd4c3796813599370422065106 Mon Sep 17 00:00:00 2001 From: Simon Guerrier Date: Wed, 7 Aug 2019 17:08:42 +0200 Subject: [PATCH 07/18] changed extract timestamp to add log --- .../datadog_checks/kubernetes_state/kubernetes_state.py | 9 +++++++-- kubernetes_state/tests/test_kubernetes_state.py | 3 +++ 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/kubernetes_state/datadog_checks/kubernetes_state/kubernetes_state.py b/kubernetes_state/datadog_checks/kubernetes_state/kubernetes_state.py index d10bfa1d1ae6a..a8bf705bda402 100644 --- a/kubernetes_state/datadog_checks/kubernetes_state/kubernetes_state.py +++ b/kubernetes_state/datadog_checks/kubernetes_state/kubernetes_state.py @@ -404,10 +404,15 @@ def _trim_job_tag(self, name): def _extract_job_timestamp(self, name): """ - Extract timestamp of job names if they match -(\\^.+\\-) - match everything until a `-` + Extract timestamp of job names """ ts = name.split('-')[-1] - return int(ts) if ts.isdigit() else 0 + if ts.isdigit(): + return int(ts) + else: + msg = 'Cannot extract ts from job name {}'.format(name) + self.log.debug(msg) + return 0 # Labels attached: namespace, pod # As a message the phase=Pending|Running|Succeeded|Failed|Unknown diff --git a/kubernetes_state/tests/test_kubernetes_state.py b/kubernetes_state/tests/test_kubernetes_state.py index 93a4808969815..bf7bf93a2d7cb 100644 --- a/kubernetes_state/tests/test_kubernetes_state.py +++ b/kubernetes_state/tests/test_kubernetes_state.py @@ -383,10 +383,13 @@ def test_pod_phase_gauges(aggregator, instance, check): def test_extract_timestamp(check): job_name = "hello2-1509998340" job_name2 = "hello-2-1509998340" + job_name3 = "hello2" result = check._extract_job_timestamp(job_name) assert result == 1509998340 result = check._extract_job_timestamp(job_name2) assert result == 1509998340 + result = check._extract_job_timestamp(job_name3) + assert result == 0 def test_job_counts(aggregator, instance, check): for _ in range(2): From ef864f38f80c6d57e171e33a9b7296ced55ce965 Mon Sep 17 00:00:00 2001 From: Simon Guerrier Date: Mon, 12 Aug 2019 17:28:39 +0200 Subject: [PATCH 08/18] fixed issue when timestamp not in order. Added tests for the logic. --- .../kubernetes_state/kubernetes_state.py | 12 ++++---- .../tests/test_kubernetes_state.py | 30 +++++++++++++++---- 2 files changed, 31 insertions(+), 11 deletions(-) diff --git a/kubernetes_state/datadog_checks/kubernetes_state/kubernetes_state.py b/kubernetes_state/datadog_checks/kubernetes_state/kubernetes_state.py index a8bf705bda402..201759966e110 100644 --- a/kubernetes_state/datadog_checks/kubernetes_state/kubernetes_state.py +++ b/kubernetes_state/datadog_checks/kubernetes_state/kubernetes_state.py @@ -38,7 +38,7 @@ class KubernetesState(OpenMetricsBaseCheck): class JobCount: def __init__(self): self.count = 0 - self.last_job_ts = 0 + self.last_jobs_ts = [] DEFAULT_METRIC_LIMIT = 0 @@ -535,9 +535,10 @@ def kube_job_status_failed(self, metric, scraper_config): tags.append(self._format_tag(label_name, trimmed_job, scraper_config)) else: tags.append(self._format_tag(label_name, label_value, scraper_config)) - if job_ts != 0 and job_ts > self.failed_job_counts[frozenset(tags)].last_job_ts: + if job_ts != 0 and job_ts not in self.failed_job_counts[frozenset(tags)].last_jobs_ts: + print("Add value to fail") self.failed_job_counts[frozenset(tags)].count += sample[self.SAMPLE_VALUE] - self.failed_job_counts[frozenset(tags)].last_job_ts = job_ts + self.failed_job_counts[frozenset(tags)].last_jobs_ts.append(job_ts) def kube_job_status_succeeded(self, metric, scraper_config): for sample in metric.samples: @@ -550,9 +551,10 @@ def kube_job_status_succeeded(self, metric, scraper_config): tags.append(self._format_tag(label_name, trimmed_job, scraper_config)) else: tags.append(self._format_tag(label_name, label_value, scraper_config)) - if job_ts != 0 and job_ts > self.succeeded_job_counts[frozenset(tags)].last_job_ts: + if job_ts != 0 and job_ts not in self.succeeded_job_counts[frozenset(tags)].last_jobs_ts: + print("Add value to success") self.succeeded_job_counts[frozenset(tags)].count += sample[self.SAMPLE_VALUE] - self.succeeded_job_counts[frozenset(tags)].last_job_ts = job_ts + self.succeeded_job_counts[frozenset(tags)].last_jobs_ts.append(job_ts) def kube_node_status_condition(self, metric, scraper_config): """ The ready status of a cluster node. v1.0+""" diff --git a/kubernetes_state/tests/test_kubernetes_state.py b/kubernetes_state/tests/test_kubernetes_state.py index bf7bf93a2d7cb..b52f4161fce31 100644 --- a/kubernetes_state/tests/test_kubernetes_state.py +++ b/kubernetes_state/tests/test_kubernetes_state.py @@ -208,6 +208,10 @@ def close(self): pass +def mock_from_file(fname): + with open(os.path.join(HERE, 'fixtures', fname), 'rb') as f: + return f.read() + @pytest.fixture def instance(): return {'host': 'foo', 'kube_state_url': 'http://foo', 'tags': ['optional:tag1'], 'telemetry': False} @@ -216,9 +220,7 @@ def instance(): @pytest.fixture def check(instance): check = KubernetesState(CHECK_NAME, {}, {}, [instance]) - with open(os.path.join(HERE, 'fixtures', 'prometheus.txt'), 'rb') as f: - check.poll = mock.MagicMock(return_value=MockResponse(f.read(), 'text/plain')) - + check.poll = mock.MagicMock(return_value=MockResponse(mock_from_file("prometheus.txt"), 'text/plain')) return check @@ -391,7 +393,11 @@ def test_extract_timestamp(check): result = check._extract_job_timestamp(job_name3) assert result == 0 -def test_job_counts(aggregator, instance, check): +def test_job_counts(aggregator, instance): + check = KubernetesState(CHECK_NAME, {}, {}, [instance]) + payload = mock_from_file("prometheus.txt") + check.poll = mock.MagicMock(return_value=MockResponse(payload, 'text/plain')) + for _ in range(2): check.check(instance) aggregator.assert_metric( @@ -411,13 +417,25 @@ def test_job_counts(aggregator, instance, check): NAMESPACE + '.job.succeeded', tags=['namespace:default', 'job:hello', 'optional:tag1'], value=3 ) + # Edit the payload and rerun the check + payload = payload.replace(b'kube_job_status_succeeded{job="hello-1509998340",namespace="default"} 1', b'kube_job_status_succeeded{job="hello-1509998500",namespace="default"} 1') + payload = payload.replace(b'kube_job_status_failed{job="hello-1509998340",namespace="default"} 0', b'kube_job_status_failed{job="hello-1509998510",namespace="default"} 1') + + check.poll = mock.MagicMock(return_value=MockResponse(payload, 'text/plain')) + for _ in range(1): + check.check(instance) + aggregator.assert_metric( + NAMESPACE + '.job.failed', tags=['namespace:default', 'job:hello', 'optional:tag1'], value=1 + ) + aggregator.assert_metric( + NAMESPACE + '.job.succeeded', tags=['namespace:default', 'job:hello', 'optional:tag1'], value=4 + ) def test_telemetry(aggregator, instance): instance['telemetry'] = True check = KubernetesState(CHECK_NAME, {}, {}, [instance]) - with open(os.path.join(HERE, 'fixtures', 'prometheus.txt'), 'rb') as f: - check.poll = mock.MagicMock(return_value=MockResponse(f.read(), 'text/plain')) + check.poll = mock.MagicMock(return_value=MockResponse(mock_from_file("prometheus.txt"), 'text/plain')) endpoint = instance['kube_state_url'] scraper_config = check.config_map[endpoint] From 2cf01768779578f6c0e84a0a74bc03a88773d3b6 Mon Sep 17 00:00:00 2001 From: Simon Guerrier Date: Tue, 13 Aug 2019 12:34:15 +0200 Subject: [PATCH 09/18] changed logic to have one array that gets reseted instead of one growing out of bounds array --- .../kubernetes_state/kubernetes_state.py | 28 +++++++++++-------- 1 file changed, 17 insertions(+), 11 deletions(-) diff --git a/kubernetes_state/datadog_checks/kubernetes_state/kubernetes_state.py b/kubernetes_state/datadog_checks/kubernetes_state/kubernetes_state.py index 201759966e110..637dd42d9da41 100644 --- a/kubernetes_state/datadog_checks/kubernetes_state/kubernetes_state.py +++ b/kubernetes_state/datadog_checks/kubernetes_state/kubernetes_state.py @@ -38,7 +38,7 @@ class KubernetesState(OpenMetricsBaseCheck): class JobCount: def __init__(self): self.count = 0 - self.last_jobs_ts = [] + self.last_job_ts = 0 DEFAULT_METRIC_LIMIT = 0 @@ -90,15 +90,23 @@ def __init__(self, name, init_config, agentConfig, instances=None): self.succeeded_job_counts = defaultdict(KubernetesState.JobCount) def check(self, instance): + self.failed_ts = defaultdict(list) + self.succeeded_ts = defaultdict(list) + endpoint = instance.get('kube_state_url') scraper_config = self.config_map[endpoint] self.process(scraper_config, metric_transformers=self.METRIC_TRANSFORMERS) - for job_tags, job_count in iteritems(self.failed_job_counts): - self.monotonic_count(scraper_config['namespace'] + '.job.failed', job_count.count, list(job_tags)) - for job_tags, job_count in iteritems(self.succeeded_job_counts): - self.monotonic_count(scraper_config['namespace'] + '.job.succeeded', job_count.count, list(job_tags)) + for job_tags, job in iteritems(self.failed_job_counts): + self.monotonic_count(scraper_config['namespace'] + '.job.failed', job.count, list(job_tags)) + if len(self.failed_ts[job_tags]) > 0: + job.last_job_ts = max(self.failed_ts[job_tags]) + + for job_tags, job in iteritems(self.succeeded_job_counts): + self.monotonic_count(scraper_config['namespace'] + '.job.succeeded', job.count, list(job_tags)) + if len(self.succeeded_ts[job_tags]) > 0: + job.last_job_ts = max(self.succeeded_ts[job_tags]) def _filter_metric(self, metric, scraper_config): if scraper_config['telemetry']: @@ -535,10 +543,9 @@ def kube_job_status_failed(self, metric, scraper_config): tags.append(self._format_tag(label_name, trimmed_job, scraper_config)) else: tags.append(self._format_tag(label_name, label_value, scraper_config)) - if job_ts != 0 and job_ts not in self.failed_job_counts[frozenset(tags)].last_jobs_ts: - print("Add value to fail") + if job_ts != 0 and job_ts > self.failed_job_counts[frozenset(tags)].last_job_ts and job_ts not in self.failed_ts[frozenset(tags)]: self.failed_job_counts[frozenset(tags)].count += sample[self.SAMPLE_VALUE] - self.failed_job_counts[frozenset(tags)].last_jobs_ts.append(job_ts) + self.failed_ts[frozenset(tags)].append(job_ts) def kube_job_status_succeeded(self, metric, scraper_config): for sample in metric.samples: @@ -551,10 +558,9 @@ def kube_job_status_succeeded(self, metric, scraper_config): tags.append(self._format_tag(label_name, trimmed_job, scraper_config)) else: tags.append(self._format_tag(label_name, label_value, scraper_config)) - if job_ts != 0 and job_ts not in self.succeeded_job_counts[frozenset(tags)].last_jobs_ts: - print("Add value to success") + if job_ts != 0 and job_ts > self.succeeded_job_counts[frozenset(tags)].last_job_ts and job_ts not in self.succeeded_ts[frozenset(tags)]: self.succeeded_job_counts[frozenset(tags)].count += sample[self.SAMPLE_VALUE] - self.succeeded_job_counts[frozenset(tags)].last_jobs_ts.append(job_ts) + self.succeeded_ts[frozenset(tags)].append(job_ts) def kube_node_status_condition(self, metric, scraper_config): """ The ready status of a cluster node. v1.0+""" From 463dbedfa2e95c39706d0aa80c546ec8681ddaa4 Mon Sep 17 00:00:00 2001 From: Simon Guerrier Date: Tue, 13 Aug 2019 13:42:07 +0200 Subject: [PATCH 10/18] removed the array as it was unecessary and work with timestamps over the current run and over all the runs --- .../kubernetes_state/kubernetes_state.py | 22 ++++++++++--------- 1 file changed, 12 insertions(+), 10 deletions(-) diff --git a/kubernetes_state/datadog_checks/kubernetes_state/kubernetes_state.py b/kubernetes_state/datadog_checks/kubernetes_state/kubernetes_state.py index 637dd42d9da41..62d8d12ccab1b 100644 --- a/kubernetes_state/datadog_checks/kubernetes_state/kubernetes_state.py +++ b/kubernetes_state/datadog_checks/kubernetes_state/kubernetes_state.py @@ -90,8 +90,8 @@ def __init__(self, name, init_config, agentConfig, instances=None): self.succeeded_job_counts = defaultdict(KubernetesState.JobCount) def check(self, instance): - self.failed_ts = defaultdict(list) - self.succeeded_ts = defaultdict(list) + self.last_failed_ts = defaultdict(int) + self.last_succeeded_ts = defaultdict(int) endpoint = instance.get('kube_state_url') @@ -100,13 +100,13 @@ def check(self, instance): for job_tags, job in iteritems(self.failed_job_counts): self.monotonic_count(scraper_config['namespace'] + '.job.failed', job.count, list(job_tags)) - if len(self.failed_ts[job_tags]) > 0: - job.last_job_ts = max(self.failed_ts[job_tags]) + if self.last_failed_ts[job_tags] > 0: + job.last_job_ts = self.last_failed_ts[job_tags] for job_tags, job in iteritems(self.succeeded_job_counts): self.monotonic_count(scraper_config['namespace'] + '.job.succeeded', job.count, list(job_tags)) - if len(self.succeeded_ts[job_tags]) > 0: - job.last_job_ts = max(self.succeeded_ts[job_tags]) + if self.last_succeeded_ts[job_tags] > 0: + job.last_job_ts = self.last_succeeded_ts[job_tags] def _filter_metric(self, metric, scraper_config): if scraper_config['telemetry']: @@ -543,9 +543,10 @@ def kube_job_status_failed(self, metric, scraper_config): tags.append(self._format_tag(label_name, trimmed_job, scraper_config)) else: tags.append(self._format_tag(label_name, label_value, scraper_config)) - if job_ts != 0 and job_ts > self.failed_job_counts[frozenset(tags)].last_job_ts and job_ts not in self.failed_ts[frozenset(tags)]: + if job_ts != 0 and job_ts > self.failed_job_counts[frozenset(tags)].last_job_ts: self.failed_job_counts[frozenset(tags)].count += sample[self.SAMPLE_VALUE] - self.failed_ts[frozenset(tags)].append(job_ts) + if job_ts > self.last_failed_ts[frozenset(tags)]: + self.last_failed_ts[frozenset(tags)] = job_ts def kube_job_status_succeeded(self, metric, scraper_config): for sample in metric.samples: @@ -558,9 +559,10 @@ def kube_job_status_succeeded(self, metric, scraper_config): tags.append(self._format_tag(label_name, trimmed_job, scraper_config)) else: tags.append(self._format_tag(label_name, label_value, scraper_config)) - if job_ts != 0 and job_ts > self.succeeded_job_counts[frozenset(tags)].last_job_ts and job_ts not in self.succeeded_ts[frozenset(tags)]: + if job_ts != 0 and job_ts > self.succeeded_job_counts[frozenset(tags)].last_job_ts: self.succeeded_job_counts[frozenset(tags)].count += sample[self.SAMPLE_VALUE] - self.succeeded_ts[frozenset(tags)].append(job_ts) + if job_ts > self.last_succeeded_ts[frozenset(tags)]: + self.last_succeeded_ts[frozenset(tags)] = job_ts def kube_node_status_condition(self, metric, scraper_config): """ The ready status of a cluster node. v1.0+""" From 0d4c5b97eb4bd6446ee9fe89c2d14be9a8e6b5de Mon Sep 17 00:00:00 2001 From: Simon Guerrier Date: Tue, 13 Aug 2019 13:46:43 +0200 Subject: [PATCH 11/18] removed the array as it was unecessary and work with timestamps over the current run and over all the runs --- .../kubernetes_state/kubernetes_state.py | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/kubernetes_state/datadog_checks/kubernetes_state/kubernetes_state.py b/kubernetes_state/datadog_checks/kubernetes_state/kubernetes_state.py index 62d8d12ccab1b..b58d0b6317368 100644 --- a/kubernetes_state/datadog_checks/kubernetes_state/kubernetes_state.py +++ b/kubernetes_state/datadog_checks/kubernetes_state/kubernetes_state.py @@ -39,6 +39,7 @@ class JobCount: def __init__(self): self.count = 0 self.last_job_ts = 0 + self.last_job_ts_current = 0 DEFAULT_METRIC_LIMIT = 0 @@ -100,13 +101,15 @@ def check(self, instance): for job_tags, job in iteritems(self.failed_job_counts): self.monotonic_count(scraper_config['namespace'] + '.job.failed', job.count, list(job_tags)) - if self.last_failed_ts[job_tags] > 0: - job.last_job_ts = self.last_failed_ts[job_tags] + if job.last_job_ts_current > 0: + job.last_job_ts = job.last_job_ts_current + job.last_job_ts_current = 0 for job_tags, job in iteritems(self.succeeded_job_counts): self.monotonic_count(scraper_config['namespace'] + '.job.succeeded', job.count, list(job_tags)) - if self.last_succeeded_ts[job_tags] > 0: - job.last_job_ts = self.last_succeeded_ts[job_tags] + if job.last_job_ts_current > 0: + job.last_job_ts = job.last_job_ts_current + job.last_job_ts_current = 0 def _filter_metric(self, metric, scraper_config): if scraper_config['telemetry']: @@ -545,8 +548,8 @@ def kube_job_status_failed(self, metric, scraper_config): tags.append(self._format_tag(label_name, label_value, scraper_config)) if job_ts != 0 and job_ts > self.failed_job_counts[frozenset(tags)].last_job_ts: self.failed_job_counts[frozenset(tags)].count += sample[self.SAMPLE_VALUE] - if job_ts > self.last_failed_ts[frozenset(tags)]: - self.last_failed_ts[frozenset(tags)] = job_ts + if job_ts > self.failed_job_counts[frozenset(tags)].last_job_ts_current: + self.failed_job_counts[frozenset(tags)].last_job_ts_current = job_ts def kube_job_status_succeeded(self, metric, scraper_config): for sample in metric.samples: @@ -561,8 +564,8 @@ def kube_job_status_succeeded(self, metric, scraper_config): tags.append(self._format_tag(label_name, label_value, scraper_config)) if job_ts != 0 and job_ts > self.succeeded_job_counts[frozenset(tags)].last_job_ts: self.succeeded_job_counts[frozenset(tags)].count += sample[self.SAMPLE_VALUE] - if job_ts > self.last_succeeded_ts[frozenset(tags)]: - self.last_succeeded_ts[frozenset(tags)] = job_ts + if job_ts > self.succeeded_job_counts[frozenset(tags)].last_job_ts_current: + self.succeeded_job_counts[frozenset(tags)].last_job_ts_current = job_ts def kube_node_status_condition(self, metric, scraper_config): """ The ready status of a cluster node. v1.0+""" From 9372c09be55942a84ab946bebfad51abe89a731a Mon Sep 17 00:00:00 2001 From: Simon Guerrier Date: Tue, 13 Aug 2019 13:53:12 +0200 Subject: [PATCH 12/18] fixed flake8 --- kubernetes_state/tests/test_kubernetes_state.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/kubernetes_state/tests/test_kubernetes_state.py b/kubernetes_state/tests/test_kubernetes_state.py index b52f4161fce31..076e2e870ff97 100644 --- a/kubernetes_state/tests/test_kubernetes_state.py +++ b/kubernetes_state/tests/test_kubernetes_state.py @@ -212,6 +212,7 @@ def mock_from_file(fname): with open(os.path.join(HERE, 'fixtures', fname), 'rb') as f: return f.read() + @pytest.fixture def instance(): return {'host': 'foo', 'kube_state_url': 'http://foo', 'tags': ['optional:tag1'], 'telemetry': False} @@ -382,6 +383,7 @@ def test_pod_phase_gauges(aggregator, instance, check): NAMESPACE + '.pod.status_phase', tags=['namespace:default', 'phase:Failed', 'optional:tag1'], value=2 ) + def test_extract_timestamp(check): job_name = "hello2-1509998340" job_name2 = "hello-2-1509998340" @@ -393,6 +395,7 @@ def test_extract_timestamp(check): result = check._extract_job_timestamp(job_name3) assert result == 0 + def test_job_counts(aggregator, instance): check = KubernetesState(CHECK_NAME, {}, {}, [instance]) payload = mock_from_file("prometheus.txt") @@ -418,8 +421,10 @@ def test_job_counts(aggregator, instance): ) # Edit the payload and rerun the check - payload = payload.replace(b'kube_job_status_succeeded{job="hello-1509998340",namespace="default"} 1', b'kube_job_status_succeeded{job="hello-1509998500",namespace="default"} 1') - payload = payload.replace(b'kube_job_status_failed{job="hello-1509998340",namespace="default"} 0', b'kube_job_status_failed{job="hello-1509998510",namespace="default"} 1') + payload = payload.replace(b'kube_job_status_succeeded{job="hello-1509998340",namespace="default"} 1', + b'kube_job_status_succeeded{job="hello-1509998500",namespace="default"} 1') + payload = payload.replace(b'kube_job_status_failed{job="hello-1509998340",namespace="default"} 0', + b'kube_job_status_failed{job="hello-1509998510",namespace="default"} 1') check.poll = mock.MagicMock(return_value=MockResponse(payload, 'text/plain')) for _ in range(1): @@ -431,6 +436,7 @@ def test_job_counts(aggregator, instance): NAMESPACE + '.job.succeeded', tags=['namespace:default', 'job:hello', 'optional:tag1'], value=4 ) + def test_telemetry(aggregator, instance): instance['telemetry'] = True From 847d0a27529dd28f3712bdfa95724abb6325fc0c Mon Sep 17 00:00:00 2001 From: Simon Guerrier Date: Tue, 13 Aug 2019 17:34:41 +0200 Subject: [PATCH 13/18] fixed flake8 --- kubernetes_state/tests/test_kubernetes_state.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/kubernetes_state/tests/test_kubernetes_state.py b/kubernetes_state/tests/test_kubernetes_state.py index 076e2e870ff97..a83601a5ce543 100644 --- a/kubernetes_state/tests/test_kubernetes_state.py +++ b/kubernetes_state/tests/test_kubernetes_state.py @@ -421,9 +421,11 @@ def test_job_counts(aggregator, instance): ) # Edit the payload and rerun the check - payload = payload.replace(b'kube_job_status_succeeded{job="hello-1509998340",namespace="default"} 1', + payload = payload.replace( + b'kube_job_status_succeeded{job="hello-1509998340",namespace="default"} 1', b'kube_job_status_succeeded{job="hello-1509998500",namespace="default"} 1') - payload = payload.replace(b'kube_job_status_failed{job="hello-1509998340",namespace="default"} 0', + payload = payload.replace( + b'kube_job_status_failed{job="hello-1509998340",namespace="default"} 0', b'kube_job_status_failed{job="hello-1509998510",namespace="default"} 1') check.poll = mock.MagicMock(return_value=MockResponse(payload, 'text/plain')) From 2272040407a56cc62ab4b8a8095966d2aae75b4a Mon Sep 17 00:00:00 2001 From: Simon Guerrier Date: Tue, 13 Aug 2019 17:43:47 +0200 Subject: [PATCH 14/18] fixed brake --- kubernetes_state/tests/test_kubernetes_state.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/kubernetes_state/tests/test_kubernetes_state.py b/kubernetes_state/tests/test_kubernetes_state.py index a83601a5ce543..9f40d6de0693b 100644 --- a/kubernetes_state/tests/test_kubernetes_state.py +++ b/kubernetes_state/tests/test_kubernetes_state.py @@ -423,10 +423,12 @@ def test_job_counts(aggregator, instance): # Edit the payload and rerun the check payload = payload.replace( b'kube_job_status_succeeded{job="hello-1509998340",namespace="default"} 1', - b'kube_job_status_succeeded{job="hello-1509998500",namespace="default"} 1') + b'kube_job_status_succeeded{job="hello-1509998500",namespace="default"} 1' + ) payload = payload.replace( b'kube_job_status_failed{job="hello-1509998340",namespace="default"} 0', - b'kube_job_status_failed{job="hello-1509998510",namespace="default"} 1') + b'kube_job_status_failed{job="hello-1509998510",namespace="default"} 1' + ) check.poll = mock.MagicMock(return_value=MockResponse(payload, 'text/plain')) for _ in range(1): From dfa1ab91b2603b7bfbb05e3408b62d7817de378a Mon Sep 17 00:00:00 2001 From: Simon Guerrier Date: Tue, 13 Aug 2019 18:39:18 +0200 Subject: [PATCH 15/18] fix one last time black --- kubernetes_state/tests/test_kubernetes_state.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/kubernetes_state/tests/test_kubernetes_state.py b/kubernetes_state/tests/test_kubernetes_state.py index 9f40d6de0693b..3d7f97a2b9d79 100644 --- a/kubernetes_state/tests/test_kubernetes_state.py +++ b/kubernetes_state/tests/test_kubernetes_state.py @@ -423,11 +423,11 @@ def test_job_counts(aggregator, instance): # Edit the payload and rerun the check payload = payload.replace( b'kube_job_status_succeeded{job="hello-1509998340",namespace="default"} 1', - b'kube_job_status_succeeded{job="hello-1509998500",namespace="default"} 1' + b'kube_job_status_succeeded{job="hello-1509998500",namespace="default"} 1', ) payload = payload.replace( b'kube_job_status_failed{job="hello-1509998340",namespace="default"} 0', - b'kube_job_status_failed{job="hello-1509998510",namespace="default"} 1' + b'kube_job_status_failed{job="hello-1509998510",namespace="default"} 1', ) check.poll = mock.MagicMock(return_value=MockResponse(payload, 'text/plain')) From 8339f95cdfa82aa74a78a7955be110c77a1af428 Mon Sep 17 00:00:00 2001 From: Simon Guerrier Date: Tue, 13 Aug 2019 19:34:21 +0200 Subject: [PATCH 16/18] removed two unused dict --- .../datadog_checks/kubernetes_state/kubernetes_state.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/kubernetes_state/datadog_checks/kubernetes_state/kubernetes_state.py b/kubernetes_state/datadog_checks/kubernetes_state/kubernetes_state.py index b58d0b6317368..befe1478b0b3e 100644 --- a/kubernetes_state/datadog_checks/kubernetes_state/kubernetes_state.py +++ b/kubernetes_state/datadog_checks/kubernetes_state/kubernetes_state.py @@ -91,9 +91,6 @@ def __init__(self, name, init_config, agentConfig, instances=None): self.succeeded_job_counts = defaultdict(KubernetesState.JobCount) def check(self, instance): - self.last_failed_ts = defaultdict(int) - self.last_succeeded_ts = defaultdict(int) - endpoint = instance.get('kube_state_url') scraper_config = self.config_map[endpoint] From d3e1cc87c18bb732beb6918fed3b199997d8a6b1 Mon Sep 17 00:00:00 2001 From: Simon Guerrier Date: Tue, 13 Aug 2019 19:36:24 +0200 Subject: [PATCH 17/18] changed var names --- .../kubernetes_state/kubernetes_state.py | 28 +++++++++---------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/kubernetes_state/datadog_checks/kubernetes_state/kubernetes_state.py b/kubernetes_state/datadog_checks/kubernetes_state/kubernetes_state.py index befe1478b0b3e..f525ae63972c5 100644 --- a/kubernetes_state/datadog_checks/kubernetes_state/kubernetes_state.py +++ b/kubernetes_state/datadog_checks/kubernetes_state/kubernetes_state.py @@ -38,8 +38,8 @@ class KubernetesState(OpenMetricsBaseCheck): class JobCount: def __init__(self): self.count = 0 - self.last_job_ts = 0 - self.last_job_ts_current = 0 + self.previous_run_max_ts = 0 + self.current_run_max_ts = 0 DEFAULT_METRIC_LIMIT = 0 @@ -98,15 +98,15 @@ def check(self, instance): for job_tags, job in iteritems(self.failed_job_counts): self.monotonic_count(scraper_config['namespace'] + '.job.failed', job.count, list(job_tags)) - if job.last_job_ts_current > 0: - job.last_job_ts = job.last_job_ts_current - job.last_job_ts_current = 0 + if job.current_run_max_ts > 0: + job.previous_run_max_ts = job.current_run_max_ts + job.current_run_max_ts = 0 for job_tags, job in iteritems(self.succeeded_job_counts): self.monotonic_count(scraper_config['namespace'] + '.job.succeeded', job.count, list(job_tags)) - if job.last_job_ts_current > 0: - job.last_job_ts = job.last_job_ts_current - job.last_job_ts_current = 0 + if job.current_run_max_ts > 0: + job.previous_run_max_ts = job.current_run_max_ts + job.current_run_max_ts = 0 def _filter_metric(self, metric, scraper_config): if scraper_config['telemetry']: @@ -543,10 +543,10 @@ def kube_job_status_failed(self, metric, scraper_config): tags.append(self._format_tag(label_name, trimmed_job, scraper_config)) else: tags.append(self._format_tag(label_name, label_value, scraper_config)) - if job_ts != 0 and job_ts > self.failed_job_counts[frozenset(tags)].last_job_ts: + if job_ts != 0 and job_ts > self.failed_job_counts[frozenset(tags)].previous_run_max_ts: self.failed_job_counts[frozenset(tags)].count += sample[self.SAMPLE_VALUE] - if job_ts > self.failed_job_counts[frozenset(tags)].last_job_ts_current: - self.failed_job_counts[frozenset(tags)].last_job_ts_current = job_ts + if job_ts > self.failed_job_counts[frozenset(tags)].current_run_max_ts: + self.failed_job_counts[frozenset(tags)].current_run_max_ts = job_ts def kube_job_status_succeeded(self, metric, scraper_config): for sample in metric.samples: @@ -559,10 +559,10 @@ def kube_job_status_succeeded(self, metric, scraper_config): tags.append(self._format_tag(label_name, trimmed_job, scraper_config)) else: tags.append(self._format_tag(label_name, label_value, scraper_config)) - if job_ts != 0 and job_ts > self.succeeded_job_counts[frozenset(tags)].last_job_ts: + if job_ts != 0 and job_ts > self.succeeded_job_counts[frozenset(tags)].previous_run_max_ts: self.succeeded_job_counts[frozenset(tags)].count += sample[self.SAMPLE_VALUE] - if job_ts > self.succeeded_job_counts[frozenset(tags)].last_job_ts_current: - self.succeeded_job_counts[frozenset(tags)].last_job_ts_current = job_ts + if job_ts > self.succeeded_job_counts[frozenset(tags)].current_run_max_ts: + self.succeeded_job_counts[frozenset(tags)].current_run_max_ts = job_ts def kube_node_status_condition(self, metric, scraper_config): """ The ready status of a cluster node. v1.0+""" From 9d7facd9b5082da5a6b449b898cb78a1da34e163 Mon Sep 17 00:00:00 2001 From: Simon Guerrier Date: Wed, 14 Aug 2019 15:17:28 +0200 Subject: [PATCH 18/18] modified following the good comments and added few methods to the JobCount class --- .../kubernetes_state/kubernetes_state.py | 34 ++++++++++--------- .../tests/test_kubernetes_state.py | 6 ++-- 2 files changed, 20 insertions(+), 20 deletions(-) diff --git a/kubernetes_state/datadog_checks/kubernetes_state/kubernetes_state.py b/kubernetes_state/datadog_checks/kubernetes_state/kubernetes_state.py index f525ae63972c5..2d3580f628b1d 100644 --- a/kubernetes_state/datadog_checks/kubernetes_state/kubernetes_state.py +++ b/kubernetes_state/datadog_checks/kubernetes_state/kubernetes_state.py @@ -41,6 +41,16 @@ def __init__(self): self.previous_run_max_ts = 0 self.current_run_max_ts = 0 + def set_previous_and_reset_current_ts(self): + if self.current_run_max_ts > 0: + self.previous_run_max_ts = self.current_run_max_ts + self.current_run_max_ts = 0 + + def update_current_ts_and_add_count(self, job_ts, count): + if job_ts != 0 and job_ts > self.previous_run_max_ts: + self.count += count + self.current_run_max_ts = max(self.current_run_max_ts, job_ts) + DEFAULT_METRIC_LIMIT = 0 def __init__(self, name, init_config, agentConfig, instances=None): @@ -98,15 +108,11 @@ def check(self, instance): for job_tags, job in iteritems(self.failed_job_counts): self.monotonic_count(scraper_config['namespace'] + '.job.failed', job.count, list(job_tags)) - if job.current_run_max_ts > 0: - job.previous_run_max_ts = job.current_run_max_ts - job.current_run_max_ts = 0 + job.set_previous_and_reset_current_ts() for job_tags, job in iteritems(self.succeeded_job_counts): self.monotonic_count(scraper_config['namespace'] + '.job.succeeded', job.count, list(job_tags)) - if job.current_run_max_ts > 0: - job.previous_run_max_ts = job.current_run_max_ts - job.current_run_max_ts = 0 + job.set_previous_and_reset_current_ts() def _filter_metric(self, metric, scraper_config): if scraper_config['telemetry']: @@ -418,8 +424,8 @@ def _extract_job_timestamp(self, name): if ts.isdigit(): return int(ts) else: - msg = 'Cannot extract ts from job name {}'.format(name) - self.log.debug(msg) + msg = 'Cannot extract ts from job name {}' + self.log.debug(msg, name) return 0 # Labels attached: namespace, pod @@ -543,10 +549,7 @@ def kube_job_status_failed(self, metric, scraper_config): tags.append(self._format_tag(label_name, trimmed_job, scraper_config)) else: tags.append(self._format_tag(label_name, label_value, scraper_config)) - if job_ts != 0 and job_ts > self.failed_job_counts[frozenset(tags)].previous_run_max_ts: - self.failed_job_counts[frozenset(tags)].count += sample[self.SAMPLE_VALUE] - if job_ts > self.failed_job_counts[frozenset(tags)].current_run_max_ts: - self.failed_job_counts[frozenset(tags)].current_run_max_ts = job_ts + self.failed_job_counts[frozenset(tags)].update_current_ts_and_add_count(job_ts, sample[self.SAMPLE_VALUE]) def kube_job_status_succeeded(self, metric, scraper_config): for sample in metric.samples: @@ -559,10 +562,9 @@ def kube_job_status_succeeded(self, metric, scraper_config): tags.append(self._format_tag(label_name, trimmed_job, scraper_config)) else: tags.append(self._format_tag(label_name, label_value, scraper_config)) - if job_ts != 0 and job_ts > self.succeeded_job_counts[frozenset(tags)].previous_run_max_ts: - self.succeeded_job_counts[frozenset(tags)].count += sample[self.SAMPLE_VALUE] - if job_ts > self.succeeded_job_counts[frozenset(tags)].current_run_max_ts: - self.succeeded_job_counts[frozenset(tags)].current_run_max_ts = job_ts + self.succeeded_job_counts[frozenset(tags)].update_current_ts_and_add_count( + job_ts, sample[self.SAMPLE_VALUE] + ) def kube_node_status_condition(self, metric, scraper_config): """ The ready status of a cluster node. v1.0+""" diff --git a/kubernetes_state/tests/test_kubernetes_state.py b/kubernetes_state/tests/test_kubernetes_state.py index 3d7f97a2b9d79..e54e683ca9922 100644 --- a/kubernetes_state/tests/test_kubernetes_state.py +++ b/kubernetes_state/tests/test_kubernetes_state.py @@ -411,8 +411,7 @@ def test_job_counts(aggregator, instance): ) # Re-run check to make sure we don't count the same jobs - for _ in range(1): - check.check(instance) + check.check(instance) aggregator.assert_metric( NAMESPACE + '.job.failed', tags=['namespace:default', 'job:hello', 'optional:tag1'], value=0 ) @@ -431,8 +430,7 @@ def test_job_counts(aggregator, instance): ) check.poll = mock.MagicMock(return_value=MockResponse(payload, 'text/plain')) - for _ in range(1): - check.check(instance) + check.check(instance) aggregator.assert_metric( NAMESPACE + '.job.failed', tags=['namespace:default', 'job:hello', 'optional:tag1'], value=1 )