Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[kafka_consumer] discovery of groups, topics and partitions #631

Merged
merged 9 commits into from
Jul 25, 2017
10 changes: 9 additions & 1 deletion kafka_consumer/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
# CHANGELOG - kafka_consumer

1.1.0 / Unreleased
==================

### Changes

* [FEATURE] discovery of groups, topics and partitions. See [#631][] (Thanks [@jeffwidman][])

1.0.1 / 2017-04-24
==================

Expand All @@ -16,4 +23,5 @@

<!--- The following link definition list is generated by PimpMyChangelog --->
[#272]: https://github.com/DataDog/integrations-core/issues/272
[@jeffwidman]: https://github.com/jeffwidman
[#631]: https://github.com/DataDog/integrations-core/issues/631
[@jeffwidman]: https://github.com/jeffwidman
259 changes: 195 additions & 64 deletions kafka_consumer/check.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@
# Licensed under Simplified BSD License (see LICENSE)

# stdlib
from collections import defaultdict
import time

# 3p
from kafka import KafkaClient
from kafka.common import OffsetRequestPayload as OffsetRequest
from kafka import SimpleClient
from kafka.structs import OffsetRequestPayload
from kazoo.client import KazooClient
from kazoo.exceptions import NoNodeError

Expand All @@ -19,6 +19,18 @@


class KafkaCheck(AgentCheck):
"""
Check Consumer Lag for Kafka consumers that store their offsets in Zookeeper.

Modern Kafka consumers store their offset in Kafka rather than Zookeeper,
but support for this has not been added yet. It's tricky because this check
is much simpler if it assumes a single place can be queried for all consumer
offsets, but currently there's no easy way to do that. Once KIP-88 is
implemented, it will be much easier to add this functionality, although it
would only work for Kafka brokers >= 0.10.2.0. In the meantime, you can
instrument your individual kafka consumers to submit their offsets to
Datadog.
"""

SOURCE_TYPE_NAME = 'kafka'

Expand All @@ -29,100 +41,219 @@ def __init__(self, name, init_config, agentConfig, instances=None):
self.kafka_timeout = int(
init_config.get('kafka_timeout', DEFAULT_KAFKA_TIMEOUT))

def check(self, instance):
consumer_groups = self.read_config(instance, 'consumer_groups',
cast=self._validate_consumer_groups)
zk_connect_str = self.read_config(instance, 'zk_connect_str')
kafka_host_ports = self.read_config(instance, 'kafka_connect_str')
def _get_highwater_offsets(self, kafka_hosts_ports):
"""
Fetch highwater offsets for each topic/partition from Kafka cluster.

Do this for all partitions in the cluster because even if it has no
consumers, we may want to measure whether producers are successfully
producing. No need to limit this for performance because fetching broker
offsets from Kafka is a relatively inexpensive operation.
"""
kafka_conn = SimpleClient(kafka_hosts_ports, timeout=self.kafka_timeout)
try:
broker_topics_partitions = kafka_conn.topics_to_brokers.keys()
# batch a bunch of requests into a single network call
offsets_request = [OffsetRequestPayload(topic, partition, -1, 1)
for topic, partition in broker_topics_partitions]
offsets_response = kafka_conn.send_offset_request(offsets_request)
highwater_offsets = {(x.topic, x.partition): x.offsets[0] for x in offsets_response}
finally:
try:
kafka_conn.close()
except Exception:
self.log.exception('Error cleaning up Kafka connection')
return highwater_offsets

def _get_zk_path_children(self, zk_conn, zk_path, name_for_error):
"""Fetch child nodes for a given Zookeeper path."""
children = []
try:
children = zk_conn.get_children(zk_path)
except NoNodeError:
self.log.info('No zookeeper node at %s', zk_path)
except Exception:
self.log.exception('Could not read %s from %s', name_for_error, zk_path)
return children

def _get_zk_consumer_offsets(self, zk_hosts_ports, consumer_groups=None, zk_prefix=''):
"""
Fetch Consumer Group offsets from Zookeeper.

Also fetch consumer_groups, topics, and partitions if not
already specified in consumer_groups.

:param dict consumer_groups: The consumer groups, topics, and partitions
that you want to fetch offsets for. If consumer_groups is None, will
fetch offsets for all consumer_groups. For examples of what this
dict can look like, see _validate_consumer_groups().
"""
zk_consumer_offsets = {}

# Construct the Zookeeper path pattern
zk_prefix = instance.get('zk_prefix', '')
zk_path_tmpl = zk_prefix + '/consumers/%s/offsets/%s/%s'
# /consumers/[groupId]/offsets/[topic]/[partitionId]
zk_path_consumer = zk_prefix + '/consumers/'
zk_path_topic_tmpl = zk_path_consumer + '{group}/offsets/'
zk_path_partition_tmpl = zk_path_topic_tmpl + '{topic}/'

# Connect to Zookeeper
zk_conn = KazooClient(zk_connect_str, timeout=self.zk_timeout)
zk_conn = KazooClient(zk_hosts_ports, timeout=self.zk_timeout)
zk_conn.start()

try:
# Query Zookeeper for consumer offsets
consumer_offsets = {}
topics = defaultdict(set)
for consumer_group, topic_partitions in consumer_groups.iteritems():
for topic, partitions in topic_partitions.iteritems():
# Remember the topic partitions that we've see so that we can
# look up their broker offsets later
topics[topic].update(set(partitions))
if consumer_groups is None:
# If consumer groups aren't specified, fetch them from ZK
consumer_groups = {consumer_group: None for consumer_group in
self._get_zk_path_children(zk_conn, zk_path_consumer, 'consumer groups')}

for consumer_group, topics in consumer_groups.iteritems():
if topics is None:
# If topics are't specified, fetch them from ZK
zk_path_topics = zk_path_topic_tmpl.format(group=consumer_group)
topics = {topic: None for topic in
self._get_zk_path_children(zk_conn, zk_path_topics, 'topics')}

for topic, partitions in topics.iteritems():
if partitions is not None:
partitions = set(partitions) # defend against bad user input
else:
# If partitions aren't specified, fetch them from ZK
zk_path_partitions = zk_path_partition_tmpl.format(
group=consumer_group, topic=topic)
# Zookeeper returns the partition IDs as strings because
# they are extracted from the node path
partitions = [int(x) for x in self._get_zk_path_children(
zk_conn, zk_path_partitions, 'partitions')]

# Fetch consumer offsets for each partition from ZK
for partition in partitions:
zk_path = zk_path_tmpl % (consumer_group, topic, partition)
zk_path = (zk_path_partition_tmpl + '{partition}/').format(
group=consumer_group, topic=topic, partition=partition)
try:
consumer_offset = int(zk_conn.get(zk_path)[0])
key = (consumer_group, topic, partition)
consumer_offsets[key] = consumer_offset
zk_consumer_offsets[key] = consumer_offset
except NoNodeError:
self.log.warn('No zookeeper node at %s' % zk_path)
self.log.info('No zookeeper node at %s', zk_path)
except Exception:
self.log.exception('Could not read consumer offset from %s' % zk_path)
self.log.exception('Could not read consumer offset from %s', zk_path)
finally:
try:
zk_conn.stop()
zk_conn.close()
except Exception:
self.log.exception('Error cleaning up Zookeeper connection')
return zk_consumer_offsets

# Connect to Kafka
kafka_conn = KafkaClient(kafka_host_ports, timeout=self.kafka_timeout)
def check(self, instance):
# For calculating lag, we have to fetch offsets from both kafka and
# zookeeper. There's a potential race condition because whichever one we
# check first may be outdated by the time we check the other. Better to
# check consumer offset before checking broker offset because worst case
# is that overstates consumer lag a little. Doing it the other way can
# understate consumer lag to the point of having negative consumer lag,
# which just creates confusion because it's theoretically impossible.

try:
# Query Kafka for the broker offsets
broker_offsets = {}
for topic, partitions in topics.items():
offset_responses = kafka_conn.send_offset_request([
OffsetRequest(topic, p, -1, 1) for p in partitions])

for resp in offset_responses:
broker_offsets[(resp.topic, resp.partition)] = resp.offsets[0]
finally:
try:
kafka_conn.close()
except Exception:
self.log.exception('Error cleaning up Kafka connection')
# Fetch consumer group offsets from Zookeeper
zk_hosts_ports = self.read_config(instance, 'zk_connect_str')
zk_prefix = instance.get('zk_prefix', '')

# Report the broker data
for (topic, partition), broker_offset in broker_offsets.items():
broker_tags = ['topic:%s' % topic, 'partition:%s' % partition]
broker_offset = broker_offsets.get((topic, partition))
self.gauge('kafka.broker_offset', broker_offset, tags=broker_tags)
# If monitor_unlisted_consumer_groups is True, fetch all groups stored in ZK
if instance.get('monitor_unlisted_consumer_groups', False):
consumer_groups = None
else:
consumer_groups = self.read_config(instance, 'consumer_groups',
cast=self._validate_consumer_groups)

consumer_offsets = self._get_zk_consumer_offsets(
zk_hosts_ports, consumer_groups, zk_prefix)

# Fetch the broker highwater offsets
kafka_hosts_ports = self.read_config(instance, 'kafka_connect_str')
highwater_offsets = self._get_highwater_offsets(kafka_hosts_ports)

# Report the consumer
for (consumer_group, topic, partition), consumer_offset in consumer_offsets.items():
# Report the broker highwater offset
for (topic, partition), highwater_offset in highwater_offsets.iteritems():
broker_tags = ['topic:%s' % topic, 'partition:%s' % partition]
self.gauge('kafka.broker_offset', highwater_offset, tags=broker_tags)

# Get the broker offset
broker_offset = broker_offsets.get((topic, partition))
# Report the consumer group offsets and consumer lag
for (consumer_group, topic, partition), consumer_offset in consumer_offsets.iteritems():
consumer_group_tags = ['topic:%s' % topic, 'partition:%s' % partition,
'consumer_group:%s' % consumer_group]
self.gauge('kafka.consumer_offset', consumer_offset, tags=consumer_group_tags)
if (topic, partition) not in highwater_offsets:
self.log.warn("Consumer offsets exist for topic: {topic} "
"partition: {partition} but that topic partition doesn't "
"actually exist in the cluster.".format(**locals()))
continue
consumer_lag = highwater_offsets[(topic, partition)] - consumer_offset
if consumer_lag < 0:
# This is a really bad scenario because new messages produced to
# the topic are never consumed by that particular consumer
# group. So still report the negative lag as a way of increasing
# visibility of the error.
title = "Consumer lag for consumer negative."
message = "Consumer lag for consumer group: {group}, topic: {topic}, " \
"partition: {partition} is negative. This should never happen.".format(
group=consumer_group,
topic=topic,
partition=partition
)
key = "{}:{}:{}".format(consumer_group, topic, partition)
self._send_event(title, message, consumer_group_tags, 'consumer_lag', key)
self.log.debug(message)

# Report the consumer offset and lag
tags = ['topic:%s' % topic, 'partition:%s' % partition,
'consumer_group:%s' % consumer_group]
self.gauge('kafka.consumer_offset', consumer_offset, tags=tags)
self.gauge('kafka.consumer_lag', broker_offset - consumer_offset,
tags=tags)
self.gauge('kafka.consumer_lag', consumer_lag,
tags=consumer_group_tags)

# Private config validation/marshalling functions

def _validate_consumer_groups(self, val):
# val = {'consumer_group': {'topic': [0, 1]}}
try:
consumer_group, topic_partitions = val.items()[0]
assert isinstance(consumer_group, (str, unicode))
topic, partitions = topic_partitions.items()[0]
assert isinstance(topic, (str, unicode))
assert isinstance(partitions, (list, tuple))
# consumer groups are optional
assert isinstance(val, dict) or val is None
if val is not None:
for consumer_group, topics in val.iteritems():
assert isinstance(consumer_group, basestring)
# topics are optional
assert isinstance(topics, dict) or topics is None
if topics is not None:
for topic, partitions in topics.iteritems():
assert isinstance(topic, basestring)
# partitions are optional
assert isinstance(partitions, (list, tuple)) or partitions is None
if partitions is not None:
for partition in partitions:
assert isinstance(partition, int)
return val
except Exception as e:
self.log.exception(e)
raise Exception('''The `consumer_groups` value must be a mapping of mappings, like this:
raise Exception("""The `consumer_groups` value must be a mapping of mappings, like this:
consumer_groups:
myconsumer0: # consumer group name
mytopic0: [0, 1] # topic: list of partitions
mytopic0: [0, 1] # topic_name: list of partitions
myconsumer1:
mytopic0: [0, 1, 2]
mytopic1: [10, 12]
''')
myconsumer2:
mytopic0:
myconsumer3:

Note that each level of values is optional. Any omitted values will be fetched from Zookeeper.
You can omit partitions (example: myconsumer2), topics (example: myconsumer3), and even consumer_groups.
If you omit consumer_groups, you must set the flag 'monitor_unlisted_consumer_groups': True.
If a value is omitted, the parent value must still be it's expected type (typically a dict).
""")

def _send_event(self, title, text, tags, type, aggregation_key):
event_dict = {
'timestamp': int(time.time()),
'source_type_name': self.SOURCE_TYPE_NAME,
'msg_title': title,
'event_type': type,
'msg_text': text,
'tags': tags,
'aggregation_key': aggregation_key,
}

self.event(event_dict)
4 changes: 4 additions & 0 deletions kafka_consumer/conf.yaml.example
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ instances:
# consumer_groups:
# my_consumer:
# my_topic: [0, 1, 4, 12]
# # Setting monitor_unlisted_consumer_groups to True will tell the check to discover
# # and fetch all offsets for all consumer groups stored in zookeeper. While this is
# # often convenient, it can also put a lot of load on zookeeper, so use judiciously.
# monitor_unlisted_consumer_groups: False

# Production example with redundant hosts:
# In a production environment, it's often useful to specify multiple
Expand Down
2 changes: 1 addition & 1 deletion kafka_consumer/manifest.json
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,6 @@
"linux",
"mac_os"
],
"version": "1.0.0",
"version": "1.1.0",
"guid": "74e1ce9c-dee9-4ad5-9b6a-05eeee7f5259"
}
19 changes: 18 additions & 1 deletion kafka_consumer/test_kafka_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@
# Licensed under Simplified BSD License (see LICENSE)

# stdlib
from nose.plugins.attrib import attr
import copy

# 3p
from nose.plugins.attrib import attr

# project
from tests.checks.common import AgentCheckTest
Expand Down Expand Up @@ -45,3 +46,19 @@ def test_check(self):
self.assertMetric(mname, at_least=1)

self.coverage_report()


def test_check_nogroups(self):
"""
Testing Kafka_consumer check grabbing groups from ZK
"""
nogroup_instance = copy.copy(instance)
nogroup_instance[0].pop('consumer_groups')
nogroup_instance[0]['monitor_unlisted_consumer_groups'] = True

self.run_check({'instances': instance})

for mname in METRICS:
self.assertMetric(mname, at_least=1)

self.coverage_report()