diff --git a/kafka_consumer/check.py b/kafka_consumer/check.py
index 95ef4fa8ee67a..89bc015807ec5 100644
--- a/kafka_consumer/check.py
+++ b/kafka_consumer/check.py
@@ -3,11 +3,11 @@
 # Licensed under Simplified BSD License (see LICENSE)
 # stdlib
-from collections import defaultdict
+import time
 # 3p
-from kafka import KafkaClient
-from kafka.common import OffsetRequestPayload as OffsetRequest
+from kafka import SimpleClient
+from kafka.structs import OffsetRequestPayload
 from kazoo.client import KazooClient
 from kazoo.exceptions import NoNodeError
@@ -19,6 +19,18 @@
 class KafkaCheck(AgentCheck):
+    """
+    Check Consumer Lag for Kafka consumers that store their offsets in Zookeeper.
+    Modern Kafka consumers store their offset in Kafka rather than Zookeeper,
+    but support for this has not been added yet. It's tricky because this check
+    is much simpler if it assumes a single place can be queried for all consumer
+    offsets, but currently there's no easy way to do that. Once KIP-88 is
+    implemented, it will be much easier to add this functionality, although it
+    would only work for Kafka brokers >= In the meantime, you can
+    instrument your individual kafka consumers to submit their offsets to
+    Datadog.
+    """
     SOURCE_TYPE_NAME = 'kafka'
@@ -29,100 +41,219 @@ def __init__(self, name, init_config, agentConfig, instances=None):
         self.kafka_timeout = int(
             init_config.get('kafka_timeout', DEFAULT_KAFKA_TIMEOUT))
-    def check(self, instance):
-        consumer_groups = self.read_config(instance, 'consumer_groups',
-                                           cast=self._validate_consumer_groups)
-        zk_connect_str = self.read_config(instance, 'zk_connect_str')
-        kafka_host_ports = self.read_config(instance, 'kafka_connect_str')
+    def _get_highwater_offsets(self, kafka_hosts_ports):
+        """
+        Fetch highwater offsets for each topic/partition from Kafka cluster.
+        Do this for all partitions in the cluster because even if it has no
+        consumers, we may want to measure whether producers are successfully
+        producing. No need to limit this for performance because fetching broker
+        offsets from Kafka is a relatively inexpensive operation.
+        """
+        kafka_conn = SimpleClient(kafka_hosts_ports, timeout=self.kafka_timeout)
+        try:
+            broker_topics_partitions = kafka_conn.topics_to_brokers.keys()
+            # batch a bunch of requests into a single network call
+            offsets_request = [OffsetRequestPayload(topic, partition, -1, 1)
+                for topic, partition in broker_topics_partitions]
+            offsets_response = kafka_conn.send_offset_request(offsets_request)
+            highwater_offsets = {(x.topic, x.partition): x.offsets[0] for x in offsets_response}
+        finally:
+            try:
+                kafka_conn.close()
+            except Exception:
+                self.log.exception('Error cleaning up Kafka connection')
+        return highwater_offsets
+    def _get_zk_path_children(self, zk_conn, zk_path, name_for_error):
+        """Fetch child nodes for a given Zookeeper path."""
+        children = []
+        try:
+            children = zk_conn.get_children(zk_path)
+        except NoNodeError:
+            self.log.info('No zookeeper node at %s', zk_path)
+        except Exception:
+            self.log.exception('Could not read %s from %s', name_for_error, zk_path)
+        return children
+    def _get_zk_consumer_offsets(self, zk_hosts_ports, consumer_groups=None, zk_prefix=''):
+        """
+        Fetch Consumer Group offsets from Zookeeper.
+        Also fetch consumer_groups, topics, and partitions if not
+        already specified in consumer_groups.
+        :param dict consumer_groups: The consumer groups, topics, and partitions
+            that you want to fetch offsets for. If consumer_groups is None, will
+            fetch offsets for all consumer_groups. For examples of what this
+            dict can look like, see _validate_consumer_groups().
+        """
+        zk_consumer_offsets = {}
         # Construct the Zookeeper path pattern
-        zk_prefix = instance.get('zk_prefix', '')
-        zk_path_tmpl = zk_prefix + '/consumers/%s/offsets/%s/%s'
+        # /consumers/[groupId]/offsets/[topic]/[partitionId]
+        zk_path_consumer = zk_prefix + '/consumers/'
+        zk_path_topic_tmpl = zk_path_consumer + '{group}/offsets/'
+        zk_path_partition_tmpl = zk_path_topic_tmpl + '{topic}/'
-        # Connect to Zookeeper
-        zk_conn = KazooClient(zk_connect_str, timeout=self.zk_timeout)
+        zk_conn = KazooClient(zk_hosts_ports, timeout=self.zk_timeout)
-            # Query Zookeeper for consumer offsets
-            consumer_offsets = {}
-            topics = defaultdict(set)
-            for consumer_group, topic_partitions in consumer_groups.iteritems():
-                for topic, partitions in topic_partitions.iteritems():
-                    # Remember the topic partitions that we've see so that we can
-                    # look up their broker offsets later
-                    topics[topic].update(set(partitions))
+            if consumer_groups is None:
+                # If consumer groups aren't specified, fetch them from ZK
+                consumer_groups = {consumer_group: None for consumer_group in
+                    self._get_zk_path_children(zk_conn, zk_path_consumer, 'consumer groups')}
+            for consumer_group, topics in consumer_groups.iteritems():
+                if topics is None:
+                    # If topics are't specified, fetch them from ZK
+                    zk_path_topics = zk_path_topic_tmpl.format(group=consumer_group)
+                    topics = {topic: None for topic in
+                        self._get_zk_path_children(zk_conn, zk_path_topics, 'topics')}
+                for topic, partitions in topics.iteritems():
+                    if partitions is not None:
+                        partitions = set(partitions)  # defend against bad user input
+                    else:
+                        # If partitions aren't specified, fetch them from ZK
+                        zk_path_partitions = zk_path_partition_tmpl.format(
+                            group=consumer_group, topic=topic)
+                        # Zookeeper returns the partition IDs as strings because
+                        # they are extracted from the node path
+                        partitions = [int(x) for x in self._get_zk_path_children(
+                            zk_conn, zk_path_partitions, 'partitions')]
+                    # Fetch consumer offsets for each partition from ZK
                     for partition in partitions:
-                        zk_path = zk_path_tmpl % (consumer_group, topic, partition)
+                        zk_path = (zk_path_partition_tmpl + '{partition}/').format(
+                            group=consumer_group, topic=topic, partition=partition)
                             consumer_offset = int(zk_conn.get(zk_path)[0])
                             key = (consumer_group, topic, partition)
-                            consumer_offsets[key] = consumer_offset
+                            zk_consumer_offsets[key] = consumer_offset
                         except NoNodeError:
-                            self.log.warn('No zookeeper node at %s' % zk_path)
+                            self.log.info('No zookeeper node at %s', zk_path)
                         except Exception:
-                            self.log.exception('Could not read consumer offset from %s' % zk_path)
+                            self.log.exception('Could not read consumer offset from %s', zk_path)
             except Exception:
                 self.log.exception('Error cleaning up Zookeeper connection')
+        return zk_consumer_offsets
-        # Connect to Kafka
-        kafka_conn = KafkaClient(kafka_host_ports, timeout=self.kafka_timeout)
+    def check(self, instance):
+        # For calculating lag, we have to fetch offsets from both kafka and
+        # zookeeper. There's a potential race condition because whichever one we
+        # check first may be outdated by the time we check the other. Better to
+        # check consumer offset before checking broker offset because worst case
+        # is that overstates consumer lag a little. Doing it the other way can
+        # understate consumer lag to the point of having negative consumer lag,
+        # which just creates confusion because it's theoretically impossible.
-        try:
-            # Query Kafka for the broker offsets
-            broker_offsets = {}
-            for topic, partitions in topics.items():
-                offset_responses = kafka_conn.send_offset_request([
-                    OffsetRequest(topic, p, -1, 1) for p in partitions])
-                for resp in offset_responses:
-                    broker_offsets[(resp.topic, resp.partition)] = resp.offsets[0]
-        finally:
-            try:
-                kafka_conn.close()
-            except Exception:
-                self.log.exception('Error cleaning up Kafka connection')
+        # Fetch consumer group offsets from Zookeeper
+        zk_hosts_ports = self.read_config(instance, 'zk_connect_str')
+        zk_prefix = instance.get('zk_prefix', '')
-        # Report the broker data
-        for (topic, partition), broker_offset in broker_offsets.items():
-            broker_tags = ['topic:%s' % topic, 'partition:%s' % partition]
-            broker_offset = broker_offsets.get((topic, partition))
-            self.gauge('kafka.broker_offset', broker_offset, tags=broker_tags)
+        # If monitor_unlisted_consumer_groups is True, fetch all groups stored in ZK
+        if instance.get('monitor_unlisted_consumer_groups', False):
+            consumer_groups = None
+        else:
+            consumer_groups = self.read_config(instance, 'consumer_groups',
+                                               cast=self._validate_consumer_groups)
+        consumer_offsets = self._get_zk_consumer_offsets(
+            zk_hosts_ports, consumer_groups, zk_prefix)
+        # Fetch the broker highwater offsets
+        kafka_hosts_ports = self.read_config(instance, 'kafka_connect_str')
+        highwater_offsets = self._get_highwater_offsets(kafka_hosts_ports)
-        # Report the consumer
-        for (consumer_group, topic, partition), consumer_offset in consumer_offsets.items():
+        # Report the broker highwater offset
+        for (topic, partition), highwater_offset in highwater_offsets.iteritems():
+            broker_tags = ['topic:%s' % topic, 'partition:%s' % partition]
+            self.gauge('kafka.broker_offset', highwater_offset, tags=broker_tags)
-            # Get the broker offset
-            broker_offset = broker_offsets.get((topic, partition))
+        # Report the consumer group offsets and consumer lag
+        for (consumer_group, topic, partition), consumer_offset in consumer_offsets.iteritems():
+            consumer_group_tags = ['topic:%s' % topic, 'partition:%s' % partition,
+                'consumer_group:%s' % consumer_group]
+            self.gauge('kafka.consumer_offset', consumer_offset, tags=consumer_group_tags)
+            if (topic, partition) not in highwater_offsets:
+                self.log.warn("Consumer offsets exist for topic: {topic} "
+                    "partition: {partition} but that topic partition doesn't "
+                    "actually exist in the cluster.".format(**locals()))
+                continue
+            consumer_lag = highwater_offsets[(topic, partition)] - consumer_offset
+            if consumer_lag < 0:
+                # This is a really bad scenario because new messages produced to
+                # the topic are never consumed by that particular consumer
+                # group. So still report the negative lag as a way of increasing
+                # visibility of the error.
+                title = "Consumer lag for consumer negative."
+                message = "Consumer lag for consumer group: {group}, topic: {topic}, " \
+                    "partition: {partition} is negative. This should never happen.".format(
+                        group=consumer_group,
+                        topic=topic,
+                        partition=partition
+                    )
+                key = "{}:{}:{}".format(consumer_group, topic, partition)
+                self._send_event(title, message, consumer_group_tags, 'consumer_lag', key)
+                self.log.debug(message)
-            # Report the consumer offset and lag
-            tags = ['topic:%s' % topic, 'partition:%s' % partition,
-                    'consumer_group:%s' % consumer_group]
-            self.gauge('kafka.consumer_offset', consumer_offset, tags=tags)
-            self.gauge('kafka.consumer_lag', broker_offset - consumer_offset,
-                       tags=tags)
+            self.gauge('kafka.consumer_lag', consumer_lag,
+               tags=consumer_group_tags)
     # Private config validation/marshalling functions
     def _validate_consumer_groups(self, val):
+        # val = {'consumer_group': {'topic': [0, 1]}}
-            consumer_group, topic_partitions = val.items()[0]
-            assert isinstance(consumer_group, (str, unicode))
-            topic, partitions = topic_partitions.items()[0]
-            assert isinstance(topic, (str, unicode))
-            assert isinstance(partitions, (list, tuple))
+            # consumer groups are optional
+            assert isinstance(val, dict) or val is None
+            if val is not None:
+                for consumer_group, topics in val.iteritems():
+                    assert isinstance(consumer_group, basestring)
+                    # topics are optional
+                    assert isinstance(topics, dict) or topics is None
+                    if topics is not None:
+                        for topic, partitions in topics.iteritems():
+                            assert isinstance(topic, basestring)
+                            # partitions are optional
+                            assert isinstance(partitions, (list, tuple)) or partitions is None
+                            if partitions is not None:
+                                for partition in partitions:
+                                    assert isinstance(partition, int)
             return val
         except Exception as e:
-            raise Exception('''The `consumer_groups` value must be a mapping of mappings, like this:
+            raise Exception("""The `consumer_groups` value must be a mapping of mappings, like this:
   myconsumer0: # consumer group name
-    mytopic0: [0, 1] # topic: list of partitions
+    mytopic0: [0, 1] # topic_name: list of partitions
     mytopic0: [0, 1, 2]
     mytopic1: [10, 12]
+  myconsumer2:
+    mytopic0:
+  myconsumer3:
+Note that each level of values is optional. Any omitted values will be fetched from Zookeeper.
+You can omit partitions (example: myconsumer2), topics (example: myconsumer3), and even consumer_groups.
+If you omit consumer_groups, you must set the flag 'monitor_unlisted_consumer_groups': True.
+If a value is omitted, the parent value must still be it's expected type (typically a dict).
+    def _send_event(self, title, text, tags, type, aggregation_key):
+        event_dict = {
+            'timestamp': int(time.time()),
+            'source_type_name': self.SOURCE_TYPE_NAME,
+            'msg_title': title,
+            'event_type': type,
+            'msg_text': text,
+            'tags': tags,
+            'aggregation_key': aggregation_key,
+        }
+        self.event(event_dict)
diff --git a/kafka_consumer/conf.yaml.example b/kafka_consumer/conf.yaml.example
index 2dc68c090d5a8..23e441cf10fbd 100644
--- a/kafka_consumer/conf.yaml.example
+++ b/kafka_consumer/conf.yaml.example
@@ -18,6 +18,10 @@ instances:
   #   consumer_groups:
   #     my_consumer:
   #       my_topic: [0, 1, 4, 12]
+  #   # Setting monitor_unlisted_consumer_groups to True will tell the check to discover
+  #   # and fetch all offsets for all consumer groups stored in zookeeper. While this is
+  #   # often convenient, it can also put a lot of load on zookeeper, so use judiciously.
+  #   monitor_unlisted_consumer_groups: False
   # Production example with redundant hosts:
   # In a production environment, it's often useful to specify multiple
diff --git a/kafka_consumer/test_kafka_consumer.py b/kafka_consumer/test_kafka_consumer.py
index c8fe7d601f025..25d7ce195e278 100644
--- a/kafka_consumer/test_kafka_consumer.py
+++ b/kafka_consumer/test_kafka_consumer.py
@@ -3,9 +3,10 @@
 # Licensed under Simplified BSD License (see LICENSE)
 # stdlib
-from nose.plugins.attrib import attr
+import copy
 # 3p
+from nose.plugins.attrib import attr
 # project
 from tests.checks.common import AgentCheckTest
@@ -45,3 +46,19 @@ def test_check(self):
             self.assertMetric(mname, at_least=1)
+    def test_check_nogroups(self):
+        """
+        Testing Kafka_consumer check grabbing groups from ZK
+        """
+        nogroup_instance = copy.copy(instance)
+        nogroup_instance[0].pop('consumer_groups')
+        nogroup_instance[0]['monitor_unlisted_consumer_groups'] = True
+        self.run_check({'instances': instance})
+        for mname in METRICS:
+            self.assertMetric(mname, at_least=1)
+        self.coverage_report()