-
Notifications
You must be signed in to change notification settings - Fork 1.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Move metric reporting back into main check #13973
Merged
fanny-jiang
merged 7 commits into
AI-2904/kafka-consumer-revamp
from
AI-2904/isolate-metric-subm
Feb 15, 2023
Merged
Changes from 4 commits
Commits
Show all changes
7 commits
Select commit
Hold shift + click to select a range
0874437
Refactor metric submissions back into check
fanny-jiang 20214c5
fix spaces
fanny-jiang 51cb515
remove todo note
fanny-jiang 3e06a49
fix style
fanny-jiang 0323629
move get broker metadata
fanny-jiang 560927b
remove broker metadata method from classes
fanny-jiang 344800b
reset client offsets
fanny-jiang File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,6 +1,7 @@ | ||
# (C) Datadog, Inc. 2019-present | ||
# All rights reserved | ||
# Licensed under Simplified BSD License (see LICENSE) | ||
from time import time | ||
|
||
from datadog_checks.base import AgentCheck, is_affirmative | ||
from datadog_checks.kafka_consumer.client.kafka_client_factory import make_client | ||
|
@@ -38,9 +39,6 @@ def __init__(self, name, init_config, instances): | |
|
||
def check(self, _): | ||
"""The main entrypoint of the check.""" | ||
self._consumer_offsets = {} # Expected format: {(consumer_group, topic, partition): offset} | ||
self._highwater_offsets = {} # Expected format: {(topic, partition): offset} | ||
|
||
# Fetch Kafka consumer offsets | ||
try: | ||
self.client.get_consumer_offsets() | ||
|
@@ -70,15 +68,104 @@ def check(self, _): | |
) | ||
|
||
# Report the metrics | ||
self.client.report_highwater_offsets(self._context_limit) | ||
self.client.report_consumer_offsets_and_lag(self._context_limit - len(self.client._highwater_offsets)) | ||
# Expected format: {(consumer_group, topic, partition): offset} | ||
self._consumer_offsets = self.client.get_consumer_offsets_dict() | ||
# Expected format: {(topic, partition): offset} | ||
self._highwater_offsets = self.client.get_highwater_offsets_dict() | ||
|
||
self.report_highwater_offsets(self._context_limit) | ||
self.report_consumer_offsets_and_lag(self._context_limit - len(self._highwater_offsets)) | ||
|
||
self.collect_broker_metadata() | ||
|
||
def report_highwater_offsets(self, contexts_limit): | ||
"""Report the broker highwater offsets.""" | ||
reported_contexts = 0 | ||
self.log.debug("Reporting broker offset metric") | ||
for (topic, partition), highwater_offset in self._highwater_offsets.items(): | ||
broker_tags = ['topic:%s' % topic, 'partition:%s' % partition] | ||
broker_tags.extend(self._custom_tags) | ||
self.gauge('broker_offset', highwater_offset, tags=broker_tags) | ||
reported_contexts += 1 | ||
if reported_contexts == contexts_limit: | ||
return | ||
|
||
def report_consumer_offsets_and_lag(self, contexts_limit): | ||
"""Report the consumer offsets and consumer lag.""" | ||
reported_contexts = 0 | ||
self.log.debug("Reporting consumer offsets and lag metrics") | ||
for (consumer_group, topic, partition), consumer_offset in self._consumer_offsets.items(): | ||
if reported_contexts >= contexts_limit: | ||
self.log.debug( | ||
"Reported contexts number %s greater than or equal to contexts limit of %s, returning", | ||
str(reported_contexts), | ||
str(contexts_limit), | ||
) | ||
return | ||
consumer_group_tags = ['topic:%s' % topic, 'partition:%s' % partition, 'consumer_group:%s' % consumer_group] | ||
consumer_group_tags.extend(self._custom_tags) | ||
|
||
partitions = self.client.get_partitions_for_topic(topic) | ||
self.log.debug("Received partitions %s for topic %s", partitions, topic) | ||
if partitions is not None and partition in partitions: | ||
# report consumer offset if the partition is valid because even if leaderless the consumer offset will | ||
# be valid once the leader failover completes | ||
self.gauge('consumer_offset', consumer_offset, tags=consumer_group_tags) | ||
reported_contexts += 1 | ||
|
||
if (topic, partition) not in self._highwater_offsets: | ||
self.log.warning( | ||
"Consumer group: %s has offsets for topic: %s partition: %s, but no stored highwater offset " | ||
"(likely the partition is in the middle of leader failover) so cannot calculate consumer lag.", | ||
consumer_group, | ||
topic, | ||
partition, | ||
) | ||
continue | ||
producer_offset = self._highwater_offsets[(topic, partition)] | ||
consumer_lag = producer_offset - consumer_offset | ||
if reported_contexts < contexts_limit: | ||
self.gauge('consumer_lag', consumer_lag, tags=consumer_group_tags) | ||
reported_contexts += 1 | ||
|
||
if consumer_lag < 0: | ||
# this will effectively result in data loss, so emit an event for max visibility | ||
title = "Negative consumer lag for group: {}.".format(consumer_group) | ||
message = ( | ||
"Consumer group: {}, topic: {}, partition: {} has negative consumer lag. This should never " | ||
"happen and will result in the consumer skipping new messages until the lag turns " | ||
"positive.".format(consumer_group, topic, partition) | ||
) | ||
key = "{}:{}:{}".format(consumer_group, topic, partition) | ||
self.send_event(title, message, consumer_group_tags, 'consumer_lag', key, severity="error") | ||
self.log.debug(message) | ||
else: | ||
if partitions is None: | ||
msg = ( | ||
"Consumer group: %s has offsets for topic: %s, partition: %s, but that topic has no partitions " | ||
"in the cluster, so skipping reporting these offsets." | ||
) | ||
else: | ||
msg = ( | ||
"Consumer group: %s has offsets for topic: %s, partition: %s, but that topic partition isn't " | ||
"included in the cluster partitions, so skipping reporting these offsets." | ||
) | ||
self.log.warning(msg, consumer_group, topic, partition) | ||
self.client.request_metadata_update() # force metadata update on next poll() | ||
|
||
@AgentCheck.metadata_entrypoint | ||
def collect_broker_metadata(self): | ||
self.client.collect_broker_metadata() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we should be able to also move the implementation of |
||
|
||
# TODO: Remove me once the tests are refactored | ||
def send_event(self, title, text, tags, event_type, aggregation_key, severity='info'): | ||
self.client._send_event(title, text, tags, event_type, aggregation_key, severity='info') | ||
"""Emit an event to the Datadog Event Stream.""" | ||
event_dict = { | ||
'timestamp': int(time()), | ||
'msg_title': title, | ||
'event_type': event_type, | ||
'alert_type': severity, | ||
'msg_text': text, | ||
'tags': tags, | ||
'aggregation_key': aggregation_key, | ||
} | ||
self.event(event_dict) |
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm a little concerned about this change here. The consumer_offsets and highwater_offsets were set to
{}
on every check run. However, in thekafka_python_client
, the consumer_offsets and highwater_offsets dicts are initialized only when the class is initialized. The dicts never get reset in the client. I wonder if that'll change the behavior of the check or cause the dicts to keep growing largerThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah that's a good point, we probably shouldn't be keeping a
self._consumer_offsets
orself._highwater_offsets
value in theKafkaPythonClient
since the client stays alive for the entire duration. I think instead maybe we can initialize*_offsets
in each respectiveget_*_offsets_dict()
? Another possibility is we could have a function to "reset" the values of the offsets before each check run, although that would be less pretty.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, I'll try both of those options out and see which works better
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I ended up adding a
reset_offsets
function to the client which the check will call at the beginning of every check run