Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move metric reporting back into main check #13973

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,3 @@ def get_consumer_offsets(self):

def get_broker_offset(self):
pass

def report_consumer_offset_and_lag(self):
pass

def report_broker_offset(self):
pass

def collect_broker_metadata(self):
pass
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,3 @@ def get_consumer_offsets(self):
@abstractmethod
def get_highwater_offsets(self):
pass

@abstractmethod
def report_consumer_offsets_and_lag(self):
pass

@abstractmethod
def report_highwater_offsets(self):
pass

@abstractmethod
def collect_broker_metadata(self):
pass
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
# Licensed under Simplified BSD License (see LICENSE)
import ssl
from collections import defaultdict
from time import time

from kafka import KafkaAdminClient
from kafka import errors as kafka_errors
Expand Down Expand Up @@ -87,69 +86,6 @@ def get_consumer_offsets(self):
self.kafka_client._wait_for_futures(self._consumer_futures)
del self._consumer_futures # since it's reset on every check run, no sense holding the reference between runs

def report_consumer_offsets_and_lag(self, contexts_limit):
"""Report the consumer offsets and consumer lag."""
reported_contexts = 0
self.log.debug("Reporting consumer offsets and lag metrics")
for (consumer_group, topic, partition), consumer_offset in self._consumer_offsets.items():
if reported_contexts >= contexts_limit:
self.log.debug(
"Reported contexts number %s greater than or equal to contexts limit of %s, returning",
str(reported_contexts),
str(contexts_limit),
)
return
consumer_group_tags = ['topic:%s' % topic, 'partition:%s' % partition, 'consumer_group:%s' % consumer_group]
consumer_group_tags.extend(self.check._custom_tags)

partitions = self.kafka_client._client.cluster.partitions_for_topic(topic)
self.log.debug("Received partitions %s for topic %s", partitions, topic)
if partitions is not None and partition in partitions:
# report consumer offset if the partition is valid because even if leaderless the consumer offset will
# be valid once the leader failover completes
self.check.gauge('consumer_offset', consumer_offset, tags=consumer_group_tags)
reported_contexts += 1

if (topic, partition) not in self._highwater_offsets:
self.log.warning(
"Consumer group: %s has offsets for topic: %s partition: %s, but no stored highwater offset "
"(likely the partition is in the middle of leader failover) so cannot calculate consumer lag.",
consumer_group,
topic,
partition,
)
continue
producer_offset = self._highwater_offsets[(topic, partition)]
consumer_lag = producer_offset - consumer_offset
if reported_contexts < contexts_limit:
self.check.gauge('consumer_lag', consumer_lag, tags=consumer_group_tags)
reported_contexts += 1

if consumer_lag < 0:
# this will effectively result in data loss, so emit an event for max visibility
title = "Negative consumer lag for group: {}.".format(consumer_group)
message = (
"Consumer group: {}, topic: {}, partition: {} has negative consumer lag. This should never "
"happen and will result in the consumer skipping new messages until the lag turns "
"positive.".format(consumer_group, topic, partition)
)
key = "{}:{}:{}".format(consumer_group, topic, partition)
self._send_event(title, message, consumer_group_tags, 'consumer_lag', key, severity="error")
self.log.debug(message)
else:
if partitions is None:
msg = (
"Consumer group: %s has offsets for topic: %s, partition: %s, but that topic has no partitions "
"in the cluster, so skipping reporting these offsets."
)
else:
msg = (
"Consumer group: %s has offsets for topic: %s, partition: %s, but that topic partition isn't "
"included in the cluster partitions, so skipping reporting these offsets."
)
self.log.warning(msg, consumer_group, topic, partition)
self.kafka_client._client.cluster.request_update() # force metadata update on next poll()

def get_highwater_offsets(self):
"""Fetch highwater offsets for topic_partitions in the Kafka cluster.

Expand Down Expand Up @@ -215,18 +151,6 @@ def get_highwater_offsets(self):
# Loop until all futures resolved.
self.kafka_client._wait_for_futures(highwater_futures)

def report_highwater_offsets(self, contexts_limit):
"""Report the broker highwater offsets."""
reported_contexts = 0
self.log.debug("Reporting broker offset metric")
for (topic, partition), highwater_offset in self._highwater_offsets.items():
broker_tags = ['topic:%s' % topic, 'partition:%s' % partition]
broker_tags.extend(self.check._custom_tags)
self.check.gauge('broker_offset', highwater_offset, tags=broker_tags)
reported_contexts += 1
if reported_contexts == contexts_limit:
return

def create_kafka_admin_client(self):
return self._create_kafka_client(clazz=KafkaAdminClient)

Expand Down Expand Up @@ -292,13 +216,8 @@ def kafka_client(self):
self._kafka_client = self._create_kafka_admin_client(api_version=kafka_version)
return self._kafka_client

def collect_broker_metadata(self):
version_data = [str(part) for part in self.kafka_client._client.check_version()]
version_parts = {name: part for name, part in zip(('major', 'minor', 'patch'), version_data)}

self.check.set_metadata(
'version', '.'.join(version_data), scheme='parts', final_scheme='semver', part_map=version_parts
)
def collect_broker_version(self):
return self.kafka_client._client.check_version()

def _highwater_offsets_callback(self, response):
"""Callback that parses an OffsetFetchResponse and saves it to the highwater_offsets dict."""
Expand Down Expand Up @@ -485,15 +404,18 @@ def _list_consumer_group_offsets_send_request(self, group_id, group_coordinator_
)
return self._send_request_to_node(group_coordinator_id, request, wakeup=False)

def _send_event(self, title, text, tags, event_type, aggregation_key, severity='info'):
"""Emit an event to the Datadog Event Stream."""
event_dict = {
'timestamp': int(time()),
'msg_title': title,
'event_type': event_type,
'alert_type': severity,
'msg_text': text,
'tags': tags,
'aggregation_key': aggregation_key,
}
self.check.event(event_dict)
def get_highwater_offsets_dict(self):
return self._highwater_offsets

def get_consumer_offsets_dict(self):
return self._consumer_offsets

def get_partitions_for_topic(self, topic):
return self.kafka_client._client.cluster.partitions_for_topic(topic)

def request_metadata_update(self):
self.kafka_client._client.cluster.request_update()

def reset_offsets(self):
self._consumer_offsets = {}
self._highwater_offsets = {}
110 changes: 102 additions & 8 deletions kafka_consumer/datadog_checks/kafka_consumer/kafka_consumer.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# (C) Datadog, Inc. 2019-present
# All rights reserved
# Licensed under Simplified BSD License (see LICENSE)
from time import time

from datadog_checks.base import AgentCheck, is_affirmative
from datadog_checks.kafka_consumer.client.kafka_client_factory import make_client
Expand Down Expand Up @@ -38,10 +39,9 @@ def __init__(self, name, init_config, instances):

def check(self, _):
"""The main entrypoint of the check."""
self._consumer_offsets = {} # Expected format: {(consumer_group, topic, partition): offset}
self._highwater_offsets = {} # Expected format: {(topic, partition): offset}

# Fetch Kafka consumer offsets
self.client.reset_offsets()

try:
self.client.get_consumer_offsets()
except Exception:
Expand Down Expand Up @@ -70,15 +70,109 @@ def check(self, _):
)

# Report the metrics
self.client.report_highwater_offsets(self._context_limit)
self.client.report_consumer_offsets_and_lag(self._context_limit - len(self.client._highwater_offsets))
# Expected format: {(consumer_group, topic, partition): offset}
self._consumer_offsets = self.client.get_consumer_offsets_dict()
# Expected format: {(topic, partition): offset}
self._highwater_offsets = self.client.get_highwater_offsets_dict()
Comment on lines +73 to +76
Copy link
Contributor Author

@fanny-jiang fanny-jiang Feb 15, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a little concerned about this change here. The consumer_offsets and highwater_offsets were set to {} on every check run. However, in the kafka_python_client, the consumer_offsets and highwater_offsets dicts are initialized only when the class is initialized. The dicts never get reset in the client. I wonder if that'll change the behavior of the check or cause the dicts to keep growing larger

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah that's a good point, we probably shouldn't be keeping a self._consumer_offsets or self._highwater_offsets value in the KafkaPythonClient since the client stays alive for the entire duration. I think instead maybe we can initialize *_offsets in each respective get_*_offsets_dict()? Another possibility is we could have a function to "reset" the values of the offsets before each check run, although that would be less pretty.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, I'll try both of those options out and see which works better

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I ended up adding a reset_offsets function to the client which the check will call at the beginning of every check run


self.report_highwater_offsets(self._context_limit)
self.report_consumer_offsets_and_lag(self._context_limit - len(self._highwater_offsets))

self.collect_broker_metadata()

def report_highwater_offsets(self, contexts_limit):
"""Report the broker highwater offsets."""
reported_contexts = 0
self.log.debug("Reporting broker offset metric")
for (topic, partition), highwater_offset in self._highwater_offsets.items():
broker_tags = ['topic:%s' % topic, 'partition:%s' % partition]
broker_tags.extend(self._custom_tags)
self.gauge('broker_offset', highwater_offset, tags=broker_tags)
reported_contexts += 1
if reported_contexts == contexts_limit:
return

def report_consumer_offsets_and_lag(self, contexts_limit):
"""Report the consumer offsets and consumer lag."""
reported_contexts = 0
self.log.debug("Reporting consumer offsets and lag metrics")
for (consumer_group, topic, partition), consumer_offset in self._consumer_offsets.items():
if reported_contexts >= contexts_limit:
self.log.debug(
"Reported contexts number %s greater than or equal to contexts limit of %s, returning",
str(reported_contexts),
str(contexts_limit),
)
return
consumer_group_tags = ['topic:%s' % topic, 'partition:%s' % partition, 'consumer_group:%s' % consumer_group]
consumer_group_tags.extend(self._custom_tags)

partitions = self.client.get_partitions_for_topic(topic)
self.log.debug("Received partitions %s for topic %s", partitions, topic)
if partitions is not None and partition in partitions:
# report consumer offset if the partition is valid because even if leaderless the consumer offset will
# be valid once the leader failover completes
self.gauge('consumer_offset', consumer_offset, tags=consumer_group_tags)
reported_contexts += 1

if (topic, partition) not in self._highwater_offsets:
self.log.warning(
"Consumer group: %s has offsets for topic: %s partition: %s, but no stored highwater offset "
"(likely the partition is in the middle of leader failover) so cannot calculate consumer lag.",
consumer_group,
topic,
partition,
)
continue
producer_offset = self._highwater_offsets[(topic, partition)]
consumer_lag = producer_offset - consumer_offset
if reported_contexts < contexts_limit:
self.gauge('consumer_lag', consumer_lag, tags=consumer_group_tags)
reported_contexts += 1

if consumer_lag < 0:
# this will effectively result in data loss, so emit an event for max visibility
title = "Negative consumer lag for group: {}.".format(consumer_group)
message = (
"Consumer group: {}, topic: {}, partition: {} has negative consumer lag. This should never "
"happen and will result in the consumer skipping new messages until the lag turns "
"positive.".format(consumer_group, topic, partition)
)
key = "{}:{}:{}".format(consumer_group, topic, partition)
self.send_event(title, message, consumer_group_tags, 'consumer_lag', key, severity="error")
self.log.debug(message)
else:
if partitions is None:
msg = (
"Consumer group: %s has offsets for topic: %s, partition: %s, but that topic has no partitions "
"in the cluster, so skipping reporting these offsets."
)
else:
msg = (
"Consumer group: %s has offsets for topic: %s, partition: %s, but that topic partition isn't "
"included in the cluster partitions, so skipping reporting these offsets."
)
self.log.warning(msg, consumer_group, topic, partition)
self.client.request_metadata_update() # force metadata update on next poll()

@AgentCheck.metadata_entrypoint
def collect_broker_metadata(self):
self.client.collect_broker_metadata()
version_data = [str(part) for part in self.client.collect_broker_version()]
version_parts = {name: part for name, part in zip(('major', 'minor', 'patch'), version_data)}

self.set_metadata(
'version', '.'.join(version_data), scheme='parts', final_scheme='semver', part_map=version_parts
)

# TODO: Remove me once the tests are refactored
def send_event(self, title, text, tags, event_type, aggregation_key, severity='info'):
self.client._send_event(title, text, tags, event_type, aggregation_key, severity='info')
"""Emit an event to the Datadog Event Stream."""
event_dict = {
'timestamp': int(time()),
'msg_title': title,
'event_type': event_type,
'alert_type': severity,
'msg_text': text,
'tags': tags,
'aggregation_key': aggregation_key,
}
self.event(event_dict)