Skip to content

Commit

Permalink
Move metric reporting back into main check (#13973)
Browse files Browse the repository at this point in the history
* Refactor metric submissions back into check

* fix spaces

* remove todo note

* fix style

* move get broker metadata

* remove broker metadata method from classes

* reset client offsets
  • Loading branch information
fanny-jiang committed Feb 16, 2023
1 parent 92434ef commit 423bb34
Show file tree
Hide file tree
Showing 4 changed files with 119 additions and 124 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,3 @@ def get_consumer_offsets(self):

def get_broker_offset(self):
pass

def report_consumer_offset_and_lag(self):
pass

def report_broker_offset(self):
pass

def collect_broker_metadata(self):
pass
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,3 @@ def get_consumer_offsets(self):
@abstractmethod
def get_highwater_offsets(self):
pass

@abstractmethod
def report_consumer_offsets_and_lag(self):
pass

@abstractmethod
def report_highwater_offsets(self):
pass

@abstractmethod
def collect_broker_metadata(self):
pass
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
# Licensed under Simplified BSD License (see LICENSE)
import ssl
from collections import defaultdict
from time import time

from kafka import KafkaAdminClient
from kafka import errors as kafka_errors
Expand Down Expand Up @@ -87,69 +86,6 @@ def get_consumer_offsets(self):
self.kafka_client._wait_for_futures(self._consumer_futures)
del self._consumer_futures # since it's reset on every check run, no sense holding the reference between runs

def report_consumer_offsets_and_lag(self, contexts_limit):
"""Report the consumer offsets and consumer lag."""
reported_contexts = 0
self.log.debug("Reporting consumer offsets and lag metrics")
for (consumer_group, topic, partition), consumer_offset in self._consumer_offsets.items():
if reported_contexts >= contexts_limit:
self.log.debug(
"Reported contexts number %s greater than or equal to contexts limit of %s, returning",
str(reported_contexts),
str(contexts_limit),
)
return
consumer_group_tags = ['topic:%s' % topic, 'partition:%s' % partition, 'consumer_group:%s' % consumer_group]
consumer_group_tags.extend(self.check._custom_tags)

partitions = self.kafka_client._client.cluster.partitions_for_topic(topic)
self.log.debug("Received partitions %s for topic %s", partitions, topic)
if partitions is not None and partition in partitions:
# report consumer offset if the partition is valid because even if leaderless the consumer offset will
# be valid once the leader failover completes
self.check.gauge('consumer_offset', consumer_offset, tags=consumer_group_tags)
reported_contexts += 1

if (topic, partition) not in self._highwater_offsets:
self.log.warning(
"Consumer group: %s has offsets for topic: %s partition: %s, but no stored highwater offset "
"(likely the partition is in the middle of leader failover) so cannot calculate consumer lag.",
consumer_group,
topic,
partition,
)
continue
producer_offset = self._highwater_offsets[(topic, partition)]
consumer_lag = producer_offset - consumer_offset
if reported_contexts < contexts_limit:
self.check.gauge('consumer_lag', consumer_lag, tags=consumer_group_tags)
reported_contexts += 1

if consumer_lag < 0:
# this will effectively result in data loss, so emit an event for max visibility
title = "Negative consumer lag for group: {}.".format(consumer_group)
message = (
"Consumer group: {}, topic: {}, partition: {} has negative consumer lag. This should never "
"happen and will result in the consumer skipping new messages until the lag turns "
"positive.".format(consumer_group, topic, partition)
)
key = "{}:{}:{}".format(consumer_group, topic, partition)
self._send_event(title, message, consumer_group_tags, 'consumer_lag', key, severity="error")
self.log.debug(message)
else:
if partitions is None:
msg = (
"Consumer group: %s has offsets for topic: %s, partition: %s, but that topic has no partitions "
"in the cluster, so skipping reporting these offsets."
)
else:
msg = (
"Consumer group: %s has offsets for topic: %s, partition: %s, but that topic partition isn't "
"included in the cluster partitions, so skipping reporting these offsets."
)
self.log.warning(msg, consumer_group, topic, partition)
self.kafka_client._client.cluster.request_update() # force metadata update on next poll()

def get_highwater_offsets(self):
"""Fetch highwater offsets for topic_partitions in the Kafka cluster.
Expand Down Expand Up @@ -215,18 +151,6 @@ def get_highwater_offsets(self):
# Loop until all futures resolved.
self.kafka_client._wait_for_futures(highwater_futures)

def report_highwater_offsets(self, contexts_limit):
"""Report the broker highwater offsets."""
reported_contexts = 0
self.log.debug("Reporting broker offset metric")
for (topic, partition), highwater_offset in self._highwater_offsets.items():
broker_tags = ['topic:%s' % topic, 'partition:%s' % partition]
broker_tags.extend(self.check._custom_tags)
self.check.gauge('broker_offset', highwater_offset, tags=broker_tags)
reported_contexts += 1
if reported_contexts == contexts_limit:
return

def create_kafka_admin_client(self):
return self._create_kafka_client(clazz=KafkaAdminClient)

Expand Down Expand Up @@ -292,13 +216,8 @@ def kafka_client(self):
self._kafka_client = self._create_kafka_admin_client(api_version=kafka_version)
return self._kafka_client

def collect_broker_metadata(self):
version_data = [str(part) for part in self.kafka_client._client.check_version()]
version_parts = {name: part for name, part in zip(('major', 'minor', 'patch'), version_data)}

self.check.set_metadata(
'version', '.'.join(version_data), scheme='parts', final_scheme='semver', part_map=version_parts
)
def collect_broker_version(self):
return self.kafka_client._client.check_version()

def _highwater_offsets_callback(self, response):
"""Callback that parses an OffsetFetchResponse and saves it to the highwater_offsets dict."""
Expand Down Expand Up @@ -485,15 +404,18 @@ def _list_consumer_group_offsets_send_request(self, group_id, group_coordinator_
)
return self._send_request_to_node(group_coordinator_id, request, wakeup=False)

def _send_event(self, title, text, tags, event_type, aggregation_key, severity='info'):
"""Emit an event to the Datadog Event Stream."""
event_dict = {
'timestamp': int(time()),
'msg_title': title,
'event_type': event_type,
'alert_type': severity,
'msg_text': text,
'tags': tags,
'aggregation_key': aggregation_key,
}
self.check.event(event_dict)
def get_highwater_offsets_dict(self):
return self._highwater_offsets

def get_consumer_offsets_dict(self):
return self._consumer_offsets

def get_partitions_for_topic(self, topic):
return self.kafka_client._client.cluster.partitions_for_topic(topic)

def request_metadata_update(self):
self.kafka_client._client.cluster.request_update()

def reset_offsets(self):
self._consumer_offsets = {}
self._highwater_offsets = {}
110 changes: 102 additions & 8 deletions kafka_consumer/datadog_checks/kafka_consumer/kafka_consumer.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# (C) Datadog, Inc. 2019-present
# All rights reserved
# Licensed under Simplified BSD License (see LICENSE)
from time import time

from datadog_checks.base import AgentCheck, is_affirmative
from datadog_checks.kafka_consumer.client.kafka_client_factory import make_client
Expand Down Expand Up @@ -38,10 +39,9 @@ def __init__(self, name, init_config, instances):

def check(self, _):
"""The main entrypoint of the check."""
self._consumer_offsets = {} # Expected format: {(consumer_group, topic, partition): offset}
self._highwater_offsets = {} # Expected format: {(topic, partition): offset}

# Fetch Kafka consumer offsets
self.client.reset_offsets()

try:
self.client.get_consumer_offsets()
except Exception:
Expand Down Expand Up @@ -70,15 +70,109 @@ def check(self, _):
)

# Report the metrics
self.client.report_highwater_offsets(self._context_limit)
self.client.report_consumer_offsets_and_lag(self._context_limit - len(self.client._highwater_offsets))
# Expected format: {(consumer_group, topic, partition): offset}
self._consumer_offsets = self.client.get_consumer_offsets_dict()
# Expected format: {(topic, partition): offset}
self._highwater_offsets = self.client.get_highwater_offsets_dict()

self.report_highwater_offsets(self._context_limit)
self.report_consumer_offsets_and_lag(self._context_limit - len(self._highwater_offsets))

self.collect_broker_metadata()

def report_highwater_offsets(self, contexts_limit):
"""Report the broker highwater offsets."""
reported_contexts = 0
self.log.debug("Reporting broker offset metric")
for (topic, partition), highwater_offset in self._highwater_offsets.items():
broker_tags = ['topic:%s' % topic, 'partition:%s' % partition]
broker_tags.extend(self._custom_tags)
self.gauge('broker_offset', highwater_offset, tags=broker_tags)
reported_contexts += 1
if reported_contexts == contexts_limit:
return

def report_consumer_offsets_and_lag(self, contexts_limit):
"""Report the consumer offsets and consumer lag."""
reported_contexts = 0
self.log.debug("Reporting consumer offsets and lag metrics")
for (consumer_group, topic, partition), consumer_offset in self._consumer_offsets.items():
if reported_contexts >= contexts_limit:
self.log.debug(
"Reported contexts number %s greater than or equal to contexts limit of %s, returning",
str(reported_contexts),
str(contexts_limit),
)
return
consumer_group_tags = ['topic:%s' % topic, 'partition:%s' % partition, 'consumer_group:%s' % consumer_group]
consumer_group_tags.extend(self._custom_tags)

partitions = self.client.get_partitions_for_topic(topic)
self.log.debug("Received partitions %s for topic %s", partitions, topic)
if partitions is not None and partition in partitions:
# report consumer offset if the partition is valid because even if leaderless the consumer offset will
# be valid once the leader failover completes
self.gauge('consumer_offset', consumer_offset, tags=consumer_group_tags)
reported_contexts += 1

if (topic, partition) not in self._highwater_offsets:
self.log.warning(
"Consumer group: %s has offsets for topic: %s partition: %s, but no stored highwater offset "
"(likely the partition is in the middle of leader failover) so cannot calculate consumer lag.",
consumer_group,
topic,
partition,
)
continue
producer_offset = self._highwater_offsets[(topic, partition)]
consumer_lag = producer_offset - consumer_offset
if reported_contexts < contexts_limit:
self.gauge('consumer_lag', consumer_lag, tags=consumer_group_tags)
reported_contexts += 1

if consumer_lag < 0:
# this will effectively result in data loss, so emit an event for max visibility
title = "Negative consumer lag for group: {}.".format(consumer_group)
message = (
"Consumer group: {}, topic: {}, partition: {} has negative consumer lag. This should never "
"happen and will result in the consumer skipping new messages until the lag turns "
"positive.".format(consumer_group, topic, partition)
)
key = "{}:{}:{}".format(consumer_group, topic, partition)
self.send_event(title, message, consumer_group_tags, 'consumer_lag', key, severity="error")
self.log.debug(message)
else:
if partitions is None:
msg = (
"Consumer group: %s has offsets for topic: %s, partition: %s, but that topic has no partitions "
"in the cluster, so skipping reporting these offsets."
)
else:
msg = (
"Consumer group: %s has offsets for topic: %s, partition: %s, but that topic partition isn't "
"included in the cluster partitions, so skipping reporting these offsets."
)
self.log.warning(msg, consumer_group, topic, partition)
self.client.request_metadata_update() # force metadata update on next poll()

@AgentCheck.metadata_entrypoint
def collect_broker_metadata(self):
self.client.collect_broker_metadata()
version_data = [str(part) for part in self.client.collect_broker_version()]
version_parts = {name: part for name, part in zip(('major', 'minor', 'patch'), version_data)}

self.set_metadata(
'version', '.'.join(version_data), scheme='parts', final_scheme='semver', part_map=version_parts
)

# TODO: Remove me once the tests are refactored
def send_event(self, title, text, tags, event_type, aggregation_key, severity='info'):
self.client._send_event(title, text, tags, event_type, aggregation_key, severity='info')
"""Emit an event to the Datadog Event Stream."""
event_dict = {
'timestamp': int(time()),
'msg_title': title,
'event_type': event_type,
'alert_type': severity,
'msg_text': text,
'tags': tags,
'aggregation_key': aggregation_key,
}
self.event(event_dict)

0 comments on commit 423bb34

Please sign in to comment.