Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix KSM job metrics #4224

Merged
merged 18 commits into from
Aug 19, 2019
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@ class KubernetesState(OpenMetricsBaseCheck):
See https://github.com/kubernetes/kube-state-metrics
"""

class JobCount:
def __init__(self):
self.count = 0
self.last_jobs_ts = []

DEFAULT_METRIC_LIMIT = 0

def __init__(self, name, init_config, agentConfig, instances=None):
Expand Down Expand Up @@ -80,21 +85,20 @@ def __init__(self, name, init_config, agentConfig, instances=None):
'kube_service_spec_type': self.count_objects_by_tags,
}

# Handling jobs succeeded/failed counts
self.failed_job_counts = defaultdict(KubernetesState.JobCount)
self.succeeded_job_counts = defaultdict(KubernetesState.JobCount)

def check(self, instance):
endpoint = instance.get('kube_state_url')

# Job counters are monotonic: they increase at every run of the job
# We want to send the delta via the `monotonic_count` method
self.job_succeeded_count = defaultdict(int)
self.job_failed_count = defaultdict(int)

scraper_config = self.config_map[endpoint]
self.process(scraper_config, metric_transformers=self.METRIC_TRANSFORMERS)

for job_tags, job_count in iteritems(self.job_succeeded_count):
self.monotonic_count(scraper_config['namespace'] + '.job.succeeded', job_count, list(job_tags))
for job_tags, job_count in iteritems(self.job_failed_count):
self.monotonic_count(scraper_config['namespace'] + '.job.failed', job_count, list(job_tags))
for job_tags, job_count in iteritems(self.failed_job_counts):
self.monotonic_count(scraper_config['namespace'] + '.job.failed', job_count.count, list(job_tags))
for job_tags, job_count in iteritems(self.succeeded_job_counts):
self.monotonic_count(scraper_config['namespace'] + '.job.succeeded', job_count.count, list(job_tags))

def _filter_metric(self, metric, scraper_config):
if scraper_config['telemetry']:
Expand Down Expand Up @@ -398,6 +402,18 @@ def _trim_job_tag(self, name):
pattern = r"(-\d{4,10}$)"
return re.sub(pattern, '', name)

def _extract_job_timestamp(self, name):
"""
Extract timestamp of job names
"""
ts = name.split('-')[-1]
if ts.isdigit():
return int(ts)
else:
msg = 'Cannot extract ts from job name {}'.format(name)
self.log.debug(msg)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's better to do log.debug(msg, name) to not format when the logging level isn't debug.

return 0
therve marked this conversation as resolved.
Show resolved Hide resolved

# Labels attached: namespace, pod
# As a message the phase=Pending|Running|Succeeded|Failed|Unknown
# From the phase the check will update its status
Expand Down Expand Up @@ -510,25 +526,35 @@ def kube_job_failed(self, metric, scraper_config):

def kube_job_status_failed(self, metric, scraper_config):
for sample in metric.samples:
job_ts = 0
tags = [] + scraper_config['custom_tags']
for label_name, label_value in iteritems(sample[self.SAMPLE_LABELS]):
if label_name == 'job' or label_name == 'job_name':
trimmed_job = self._trim_job_tag(label_value)
job_ts = self._extract_job_timestamp(label_value)
tags.append(self._format_tag(label_name, trimmed_job, scraper_config))
else:
tags.append(self._format_tag(label_name, label_value, scraper_config))
self.job_failed_count[frozenset(tags)] += sample[self.SAMPLE_VALUE]
if job_ts != 0 and job_ts not in self.failed_job_counts[frozenset(tags)].last_jobs_ts:
print("Add value to fail")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's fine for testing, but let's remove it before merging.

self.failed_job_counts[frozenset(tags)].count += sample[self.SAMPLE_VALUE]
self.failed_job_counts[frozenset(tags)].last_jobs_ts.append(job_ts)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's better because this gives an accurate count, but now this last_jobs_ts grows unbounded, increasing memory usage over time.

To highlight that you can add a print(len(self.failed_job_counts[frozenset(tags)].last_jobs_ts)) and run the tests with new job executions. I think the list will grow every time the cronjob triggers and you get a new timestamp.


Simwar marked this conversation as resolved.
Show resolved Hide resolved
def kube_job_status_succeeded(self, metric, scraper_config):
for sample in metric.samples:
job_ts = 0
tags = [] + scraper_config['custom_tags']
for label_name, label_value in iteritems(sample[self.SAMPLE_LABELS]):
if label_name == 'job' or label_name == 'job_name':
trimmed_job = self._trim_job_tag(label_value)
job_ts = self._extract_job_timestamp(label_value)
tags.append(self._format_tag(label_name, trimmed_job, scraper_config))
else:
tags.append(self._format_tag(label_name, label_value, scraper_config))
self.job_succeeded_count[frozenset(tags)] += sample[self.SAMPLE_VALUE]
if job_ts != 0 and job_ts not in self.succeeded_job_counts[frozenset(tags)].last_jobs_ts:
print("Add value to success")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same

self.succeeded_job_counts[frozenset(tags)].count += sample[self.SAMPLE_VALUE]
self.succeeded_job_counts[frozenset(tags)].last_jobs_ts.append(job_ts)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same issue with mem usage


def kube_node_status_condition(self, metric, scraper_config):
""" The ready status of a cluster node. v1.0+"""
Expand Down
59 changes: 54 additions & 5 deletions kubernetes_state/tests/test_kubernetes_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,10 @@ def close(self):
pass


def mock_from_file(fname):
with open(os.path.join(HERE, 'fixtures', fname), 'rb') as f:
return f.read()

@pytest.fixture
def instance():
return {'host': 'foo', 'kube_state_url': 'http://foo', 'tags': ['optional:tag1'], 'telemetry': False}
Expand All @@ -216,9 +220,7 @@ def instance():
@pytest.fixture
def check(instance):
check = KubernetesState(CHECK_NAME, {}, {}, [instance])
with open(os.path.join(HERE, 'fixtures', 'prometheus.txt'), 'rb') as f:
check.poll = mock.MagicMock(return_value=MockResponse(f.read(), 'text/plain'))

check.poll = mock.MagicMock(return_value=MockResponse(mock_from_file("prometheus.txt"), 'text/plain'))
return check


Expand Down Expand Up @@ -380,13 +382,60 @@ def test_pod_phase_gauges(aggregator, instance, check):
NAMESPACE + '.pod.status_phase', tags=['namespace:default', 'phase:Failed', 'optional:tag1'], value=2
)

def test_extract_timestamp(check):
job_name = "hello2-1509998340"
job_name2 = "hello-2-1509998340"
job_name3 = "hello2"
result = check._extract_job_timestamp(job_name)
assert result == 1509998340
result = check._extract_job_timestamp(job_name2)
assert result == 1509998340
result = check._extract_job_timestamp(job_name3)
assert result == 0

def test_job_counts(aggregator, instance):
hkaj marked this conversation as resolved.
Show resolved Hide resolved
check = KubernetesState(CHECK_NAME, {}, {}, [instance])
payload = mock_from_file("prometheus.txt")
check.poll = mock.MagicMock(return_value=MockResponse(payload, 'text/plain'))

for _ in range(2):
check.check(instance)
aggregator.assert_metric(
NAMESPACE + '.job.failed', tags=['namespace:default', 'job:hello', 'optional:tag1'], value=0
)
aggregator.assert_metric(
NAMESPACE + '.job.succeeded', tags=['namespace:default', 'job:hello', 'optional:tag1'], value=3
)

# Re-run check to make sure we don't count the same jobs
for _ in range(1):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should remove that range call.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

check.check(instance)
aggregator.assert_metric(
NAMESPACE + '.job.failed', tags=['namespace:default', 'job:hello', 'optional:tag1'], value=0
)
aggregator.assert_metric(
NAMESPACE + '.job.succeeded', tags=['namespace:default', 'job:hello', 'optional:tag1'], value=3
)

# Edit the payload and rerun the check
payload = payload.replace(b'kube_job_status_succeeded{job="hello-1509998340",namespace="default"} 1', b'kube_job_status_succeeded{job="hello-1509998500",namespace="default"} 1')
payload = payload.replace(b'kube_job_status_failed{job="hello-1509998340",namespace="default"} 0', b'kube_job_status_failed{job="hello-1509998510",namespace="default"} 1')

check.poll = mock.MagicMock(return_value=MockResponse(payload, 'text/plain'))
for _ in range(1):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

check.check(instance)
aggregator.assert_metric(
NAMESPACE + '.job.failed', tags=['namespace:default', 'job:hello', 'optional:tag1'], value=1
)
aggregator.assert_metric(
NAMESPACE + '.job.succeeded', tags=['namespace:default', 'job:hello', 'optional:tag1'], value=4
)

def test_telemetry(aggregator, instance):
instance['telemetry'] = True

check = KubernetesState(CHECK_NAME, {}, {}, [instance])
with open(os.path.join(HERE, 'fixtures', 'prometheus.txt'), 'rb') as f:
check.poll = mock.MagicMock(return_value=MockResponse(f.read(), 'text/plain'))
check.poll = mock.MagicMock(return_value=MockResponse(mock_from_file("prometheus.txt"), 'text/plain'))

endpoint = instance['kube_state_url']
scraper_config = check.config_map[endpoint]
Expand Down