Subject: [PATCH] [kafka] discovery of groups, topics and partitions (#612)

* Make args optional in

The consumer_groups, topics, and partitions arguments are each now
optional. Any omitted values will be fetched from Zookeeper. If a value
is omitted, the parent value must still be it's expected type (typically
a dict).

Modern Kafka consumers store their offset in Kafka rather than
Zookeeper, but this PR does not add support for this. It's tricky
because this check is much simpler if it assumes a single place can be
queried for all consumer offsets, but currently there's no easy way to
do that. Once KIP-88 is implemented, it will be much easier to add this
functionality, although it would only work for Kafka brokers >= In the meantime, you can instrument your individual kafka
consumers to submit their offsets to Datadog.

* Add monitor_unlisted_consumer_groups flag which defaults to False

* [kafka_consumer] send event if consumer lag negative.

* [kafka_consumer][test] add a test to auto_discover ZK groups.

* [kafka_consumer] fix event submission.

* [kafka_consumer] format missing consumer_group key.

* [kafka_consumer] fix all path definitions + test.

* [kafka_consumer] modifying aggregation key.
 kafka_consumer/               | 259 +++++++++++++++++++-------
 kafka_consumer/conf.yaml.example      |   4 +
 kafka_consumer/ |  19 +-
 3 files changed, 217 insertions(+), 65 deletions(-)

diff --git a/kafka_consumer/ b/kafka_consumer/
index 95ef4fa8ee67a..89bc015807ec5 100644
--- a/kafka_consumer/
+++ b/kafka_consumer/
@@ -3,11 +3,11 @@
 # Licensed under Simplified BSD License (see LICENSE)
 # stdlib
-from collections import defaultdict
+import time
 # 3p
-from kafka import KafkaClient
-from kafka.common import OffsetRequestPayload as OffsetRequest
+from kafka import SimpleClient
+from kafka.structs import OffsetRequestPayload
 from kazoo.client import KazooClient
 from kazoo.exceptions import NoNodeError
@@ -19,6 +19,18 @@
 class KafkaCheck(AgentCheck):
+    """
+    Check Consumer Lag for Kafka consumers that store their offsets in Zookeeper.
+    Modern Kafka consumers store their offset in Kafka rather than Zookeeper,
+    but support for this has not been added yet. It's tricky because this check
+    is much simpler if it assumes a single place can be queried for all consumer
+    offsets, but currently there's no easy way to do that. Once KIP-88 is
+    implemented, it will be much easier to add this functionality, although it
+    would only work for Kafka brokers >= In the meantime, you can
+    instrument your individual kafka consumers to submit their offsets to
+    Datadog.
+    """
     SOURCE_TYPE_NAME = 'kafka'
@@ -29,100 +41,219 @@ def __init__(self, name, init_config, agentConfig, instances=None):
         self.kafka_timeout = int(
             init_config.get('kafka_timeout', DEFAULT_KAFKA_TIMEOUT))
-    def check(self, instance):
-        consumer_groups = self.read_config(instance, 'consumer_groups',
-                                           cast=self._validate_consumer_groups)
-        zk_connect_str = self.read_config(instance, 'zk_connect_str')
-        kafka_host_ports = self.read_config(instance, 'kafka_connect_str')
+    def _get_highwater_offsets(self, kafka_hosts_ports):
+        """
+        Fetch highwater offsets for each topic/partition from Kafka cluster.
+        Do this for all partitions in the cluster because even if it has no
+        consumers, we may want to measure whether producers are successfully
+        producing. No need to limit this for performance because fetching broker
+        offsets from Kafka is a relatively inexpensive operation.
+        """
+        kafka_conn = SimpleClient(kafka_hosts_ports, timeout=self.kafka_timeout)
+        try:
+            broker_topics_partitions = kafka_conn.topics_to_brokers.keys()
+            # batch a bunch of requests into a single network call
+            offsets_request = [OffsetRequestPayload(topic, partition, -1, 1)
+                for topic, partition in broker_topics_partitions]
+            offsets_response = kafka_conn.send_offset_request(offsets_request)
+            highwater_offsets = {(x.topic, x.partition): x.offsets[0] for x in offsets_response}
+        finally:
+            try:
+                kafka_conn.close()
+            except Exception:
+                self.log.exception('Error cleaning up Kafka connection')
+        return highwater_offsets
+    def _get_zk_path_children(self, zk_conn, zk_path, name_for_error):
+        """Fetch child nodes for a given Zookeeper path."""
+        children = []
+        try:
+            children = zk_conn.get_children(zk_path)
+        except NoNodeError:
+  'No zookeeper node at %s', zk_path)
+        except Exception:
+            self.log.exception('Could not read %s from %s', name_for_error, zk_path)
+        return children
+    def _get_zk_consumer_offsets(self, zk_hosts_ports, consumer_groups=None, zk_prefix=''):
+        """
+        Fetch Consumer Group offsets from Zookeeper.
+        Also fetch consumer_groups, topics, and partitions if not
+        already specified in consumer_groups.
+        :param dict consumer_groups: The consumer groups, topics, and partitions
+            that you want to fetch offsets for. If consumer_groups is None, will
+            fetch offsets for all consumer_groups. For examples of what this
+            dict can look like, see _validate_consumer_groups().
+        """
+        zk_consumer_offsets = {}
         # Construct the Zookeeper path pattern
-        zk_prefix = instance.get('zk_prefix', '')
-        zk_path_tmpl = zk_prefix + '/consumers/%s/offsets/%s/%s'
+        # /consumers/[groupId]/offsets/[topic]/[partitionId]
+        zk_path_consumer = zk_prefix + '/consumers/'
+        zk_path_topic_tmpl = zk_path_consumer + '{group}/offsets/'
+        zk_path_partition_tmpl = zk_path_topic_tmpl + '{topic}/'
-        # Connect to Zookeeper
-        zk_conn = KazooClient(zk_connect_str, timeout=self.zk_timeout)
+        zk_conn = KazooClient(zk_hosts_ports, timeout=self.zk_timeout)
-            # Query Zookeeper for consumer offsets
-            consumer_offsets = {}
-            topics = defaultdict(set)
-            for consumer_group, topic_partitions in consumer_groups.iteritems():
-                for topic, partitions in topic_partitions.iteritems():
-                    # Remember the topic partitions that we've see so that we can
-                    # look up their broker offsets later
-                    topics[topic].update(set(partitions))
+            if consumer_groups is None:
+                # If consumer groups aren't specified, fetch them from ZK
+                consumer_groups = {consumer_group: None for consumer_group in
+                    self._get_zk_path_children(zk_conn, zk_path_consumer, 'consumer groups')}
+            for consumer_group, topics in consumer_groups.iteritems():
+                if topics is None:
+                    # If topics are't specified, fetch them from ZK
+                    zk_path_topics = zk_path_topic_tmpl.format(group=consumer_group)
+                    topics = {topic: None for topic in
+                        self._get_zk_path_children(zk_conn, zk_path_topics, 'topics')}
+                for topic, partitions in topics.iteritems():
+                    if partitions is not None:
+                        partitions = set(partitions)  # defend against bad user input
+                    else:
+                        # If partitions aren't specified, fetch them from ZK
+                        zk_path_partitions = zk_path_partition_tmpl.format(
+                            group=consumer_group, topic=topic)
+                        # Zookeeper returns the partition IDs as strings because
+                        # they are extracted from the node path
+                        partitions = [int(x) for x in self._get_zk_path_children(
+                            zk_conn, zk_path_partitions, 'partitions')]
+                    # Fetch consumer offsets for each partition from ZK
                     for partition in partitions:
-                        zk_path = zk_path_tmpl % (consumer_group, topic, partition)
+                        zk_path = (zk_path_partition_tmpl + '{partition}/').format(
+                            group=consumer_group, topic=topic, partition=partition)
                             consumer_offset = int(zk_conn.get(zk_path)[0])
                             key = (consumer_group, topic, partition)
-                            consumer_offsets[key] = consumer_offset
+                            zk_consumer_offsets[key] = consumer_offset
                         except NoNodeError:
-                            self.log.warn('No zookeeper node at %s' % zk_path)
+                  'No zookeeper node at %s', zk_path)
                         except Exception:
-                            self.log.exception('Could not read consumer offset from %s' % zk_path)
+                            self.log.exception('Could not read consumer offset from %s', zk_path)
             except Exception:
                 self.log.exception('Error cleaning up Zookeeper connection')
+        return zk_consumer_offsets
-        # Connect to Kafka
-        kafka_conn = KafkaClient(kafka_host_ports, timeout=self.kafka_timeout)
+    def check(self, instance):
+        # For calculating lag, we have to fetch offsets from both kafka and
+        # zookeeper. There's a potential race condition because whichever one we
+        # check first may be outdated by the time we check the other. Better to
+        # check consumer offset before checking broker offset because worst case
+        # is that overstates consumer lag a little. Doing it the other way can
+        # understate consumer lag to the point of having negative consumer lag,
+        # which just creates confusion because it's theoretically impossible.
-        try:
-            # Query Kafka for the broker offsets
-            broker_offsets = {}
-            for topic, partitions in topics.items():
-                offset_responses = kafka_conn.send_offset_request([
-                    OffsetRequest(topic, p, -1, 1) for p in partitions])
-                for resp in offset_responses:
-                    broker_offsets[(resp.topic, resp.partition)] = resp.offsets[0]
-        finally:
-            try:
-                kafka_conn.close()
-            except Exception:
-                self.log.exception('Error cleaning up Kafka connection')
+        # Fetch consumer group offsets from Zookeeper
+        zk_hosts_ports = self.read_config(instance, 'zk_connect_str')
+        zk_prefix = instance.get('zk_prefix', '')
-        # Report the broker data
-        for (topic, partition), broker_offset in broker_offsets.items():
-            broker_tags = ['topic:%s' % topic, 'partition:%s' % partition]
-            broker_offset = broker_offsets.get((topic, partition))
-            self.gauge('kafka.broker_offset', broker_offset, tags=broker_tags)
+        # If monitor_unlisted_consumer_groups is True, fetch all groups stored in ZK
+        if instance.get('monitor_unlisted_consumer_groups', False):
+            consumer_groups = None
+        else:
+            consumer_groups = self.read_config(instance, 'consumer_groups',
+                                               cast=self._validate_consumer_groups)
+        consumer_offsets = self._get_zk_consumer_offsets(
+            zk_hosts_ports, consumer_groups, zk_prefix)
+        # Fetch the broker highwater offsets
+        kafka_hosts_ports = self.read_config(instance, 'kafka_connect_str')
+        highwater_offsets = self._get_highwater_offsets(kafka_hosts_ports)
-        # Report the consumer
-        for (consumer_group, topic, partition), consumer_offset in consumer_offsets.items():
+        # Report the broker highwater offset
+        for (topic, partition), highwater_offset in highwater_offsets.iteritems():
+            broker_tags = ['topic:%s' % topic, 'partition:%s' % partition]
+            self.gauge('kafka.broker_offset', highwater_offset, tags=broker_tags)
-            # Get the broker offset
-            broker_offset = broker_offsets.get((topic, partition))
+        # Report the consumer group offsets and consumer lag
+        for (consumer_group, topic, partition), consumer_offset in consumer_offsets.iteritems():
+            consumer_group_tags = ['topic:%s' % topic, 'partition:%s' % partition,
+                'consumer_group:%s' % consumer_group]
+            self.gauge('kafka.consumer_offset', consumer_offset, tags=consumer_group_tags)
+            if (topic, partition) not in highwater_offsets:
+                self.log.warn("Consumer offsets exist for topic: {topic} "
+                    "partition: {partition} but that topic partition doesn't "
+                    "actually exist in the cluster.".format(**locals()))
+                continue
+            consumer_lag = highwater_offsets[(topic, partition)] - consumer_offset
+            if consumer_lag < 0:
+                # This is a really bad scenario because new messages produced to
+                # the topic are never consumed by that particular consumer
+                # group. So still report the negative lag as a way of increasing
+                # visibility of the error.
+                title = "Consumer lag for consumer negative."
+                message = "Consumer lag for consumer group: {group}, topic: {topic}, " \
+                    "partition: {partition} is negative. This should never happen.".format(
+                        group=consumer_group,
+                        topic=topic,
+                        partition=partition
+                    )
+                key = "{}:{}:{}".format(consumer_group, topic, partition)
+                self._send_event(title, message, consumer_group_tags, 'consumer_lag', key)
+                self.log.debug(message)
-            # Report the consumer offset and lag
-            tags = ['topic:%s' % topic, 'partition:%s' % partition,
-                    'consumer_group:%s' % consumer_group]
-            self.gauge('kafka.consumer_offset', consumer_offset, tags=tags)
-            self.gauge('kafka.consumer_lag', broker_offset - consumer_offset,
-                       tags=tags)
+            self.gauge('kafka.consumer_lag', consumer_lag,
+               tags=consumer_group_tags)
     # Private config validation/marshalling functions
     def _validate_consumer_groups(self, val):
+        # val = {'consumer_group': {'topic': [0, 1]}}
-            consumer_group, topic_partitions = val.items()[0]
-            assert isinstance(consumer_group, (str, unicode))
-            topic, partitions = topic_partitions.items()[0]
-            assert isinstance(topic, (str, unicode))
-            assert isinstance(partitions, (list, tuple))
+            # consumer groups are optional
+            assert isinstance(val, dict) or val is None
+            if val is not None:
+                for consumer_group, topics in val.iteritems():
+                    assert isinstance(consumer_group, basestring)
+                    # topics are optional
+                    assert isinstance(topics, dict) or topics is None
+                    if topics is not None:
+                        for topic, partitions in topics.iteritems():
+                            assert isinstance(topic, basestring)
+                            # partitions are optional
+                            assert isinstance(partitions, (list, tuple)) or partitions is None
+                            if partitions is not None:
+                                for partition in partitions:
+                                    assert isinstance(partition, int)
             return val
         except Exception as e:
-            raise Exception('''The `consumer_groups` value must be a mapping of mappings, like this:
+            raise Exception("""The `consumer_groups` value must be a mapping of mappings, like this:
   myconsumer0: # consumer group name
-    mytopic0: [0, 1] # topic: list of partitions
+    mytopic0: [0, 1] # topic_name: list of partitions
     mytopic0: [0, 1, 2]
     mytopic1: [10, 12]
+  myconsumer2:
+    mytopic0:
+  myconsumer3:
+Note that each level of values is optional. Any omitted values will be fetched from Zookeeper.
+You can omit partitions (example: myconsumer2), topics (example: myconsumer3), and even consumer_groups.
+If you omit consumer_groups, you must set the flag 'monitor_unlisted_consumer_groups': True.
+If a value is omitted, the parent value must still be it's expected type (typically a dict).
+    def _send_event(self, title, text, tags, type, aggregation_key):
+        event_dict = {
+            'timestamp': int(time.time()),
+            'source_type_name': self.SOURCE_TYPE_NAME,
+            'msg_title': title,
+            'event_type': type,
+            'msg_text': text,
+            'tags': tags,
+            'aggregation_key': aggregation_key,
+        }
+        self.event(event_dict)
diff --git a/kafka_consumer/conf.yaml.example b/kafka_consumer/conf.yaml.example
index 2dc68c090d5a8..23e441cf10fbd 100644
--- a/kafka_consumer/conf.yaml.example
+++ b/kafka_consumer/conf.yaml.example
@@ -18,6 +18,10 @@ instances:
   #   consumer_groups:
   #     my_consumer:
   #       my_topic: [0, 1, 4, 12]
+  #   # Setting monitor_unlisted_consumer_groups to True will tell the check to discover
+  #   # and fetch all offsets for all consumer groups stored in zookeeper. While this is
+  #   # often convenient, it can also put a lot of load on zookeeper, so use judiciously.
+  #   monitor_unlisted_consumer_groups: False
   # Production example with redundant hosts:
   # In a production environment, it's often useful to specify multiple
diff --git a/kafka_consumer/ b/kafka_consumer/
index c8fe7d601f025..25d7ce195e278 100644
--- a/kafka_consumer/
+++ b/kafka_consumer/
@@ -3,9 +3,10 @@
 # Licensed under Simplified BSD License (see LICENSE)
 # stdlib
-from nose.plugins.attrib import attr
+import copy
 # 3p
+from nose.plugins.attrib import attr
 # project
 from tests.checks.common import AgentCheckTest
@@ -45,3 +46,19 @@ def test_check(self):
             self.assertMetric(mname, at_least=1)
+    def test_check_nogroups(self):
+        """
+        Testing Kafka_consumer check grabbing groups from ZK
+        """
+        nogroup_instance = copy.copy(instance)
+        nogroup_instance[0].pop('consumer_groups')
+        nogroup_instance[0]['monitor_unlisted_consumer_groups'] = True
+        self.run_check({'instances': instance})
+        for mname in METRICS:
+            self.assertMetric(mname, at_least=1)
+        self.coverage_report()