diff --git a/kafka_consumer/check.py b/kafka_consumer/check.py index 89bc015807ec5..95ef4fa8ee67a 100644 --- a/kafka_consumer/check.py +++ b/kafka_consumer/check.py @@ -3,11 +3,11 @@ # Licensed under Simplified BSD License (see LICENSE) # stdlib -import time +from collections import defaultdict # 3p -from kafka import SimpleClient -from kafka.structs import OffsetRequestPayload +from kafka import KafkaClient +from kafka.common import OffsetRequestPayload as OffsetRequest from kazoo.client import KazooClient from kazoo.exceptions import NoNodeError @@ -19,18 +19,6 @@ class KafkaCheck(AgentCheck): - """ - Check Consumer Lag for Kafka consumers that store their offsets in Zookeeper. - - Modern Kafka consumers store their offset in Kafka rather than Zookeeper, - but support for this has not been added yet. It's tricky because this check - is much simpler if it assumes a single place can be queried for all consumer - offsets, but currently there's no easy way to do that. Once KIP-88 is - implemented, it will be much easier to add this functionality, although it - would only work for Kafka brokers >= 0.10.2.0. In the meantime, you can - instrument your individual kafka consumers to submit their offsets to - Datadog. - """ SOURCE_TYPE_NAME = 'kafka' @@ -41,219 +29,100 @@ def __init__(self, name, init_config, agentConfig, instances=None): self.kafka_timeout = int( init_config.get('kafka_timeout', DEFAULT_KAFKA_TIMEOUT)) - def _get_highwater_offsets(self, kafka_hosts_ports): - """ - Fetch highwater offsets for each topic/partition from Kafka cluster. - - Do this for all partitions in the cluster because even if it has no - consumers, we may want to measure whether producers are successfully - producing. No need to limit this for performance because fetching broker - offsets from Kafka is a relatively inexpensive operation. - """ - kafka_conn = SimpleClient(kafka_hosts_ports, timeout=self.kafka_timeout) - try: - broker_topics_partitions = kafka_conn.topics_to_brokers.keys() - # batch a bunch of requests into a single network call - offsets_request = [OffsetRequestPayload(topic, partition, -1, 1) - for topic, partition in broker_topics_partitions] - offsets_response = kafka_conn.send_offset_request(offsets_request) - highwater_offsets = {(x.topic, x.partition): x.offsets[0] for x in offsets_response} - finally: - try: - kafka_conn.close() - except Exception: - self.log.exception('Error cleaning up Kafka connection') - return highwater_offsets - - def _get_zk_path_children(self, zk_conn, zk_path, name_for_error): - """Fetch child nodes for a given Zookeeper path.""" - children = [] - try: - children = zk_conn.get_children(zk_path) - except NoNodeError: - self.log.info('No zookeeper node at %s', zk_path) - except Exception: - self.log.exception('Could not read %s from %s', name_for_error, zk_path) - return children - - def _get_zk_consumer_offsets(self, zk_hosts_ports, consumer_groups=None, zk_prefix=''): - """ - Fetch Consumer Group offsets from Zookeeper. - - Also fetch consumer_groups, topics, and partitions if not - already specified in consumer_groups. - - :param dict consumer_groups: The consumer groups, topics, and partitions - that you want to fetch offsets for. If consumer_groups is None, will - fetch offsets for all consumer_groups. For examples of what this - dict can look like, see _validate_consumer_groups(). - """ - zk_consumer_offsets = {} + def check(self, instance): + consumer_groups = self.read_config(instance, 'consumer_groups', + cast=self._validate_consumer_groups) + zk_connect_str = self.read_config(instance, 'zk_connect_str') + kafka_host_ports = self.read_config(instance, 'kafka_connect_str') # Construct the Zookeeper path pattern - # /consumers/[groupId]/offsets/[topic]/[partitionId] - zk_path_consumer = zk_prefix + '/consumers/' - zk_path_topic_tmpl = zk_path_consumer + '{group}/offsets/' - zk_path_partition_tmpl = zk_path_topic_tmpl + '{topic}/' + zk_prefix = instance.get('zk_prefix', '') + zk_path_tmpl = zk_prefix + '/consumers/%s/offsets/%s/%s' - zk_conn = KazooClient(zk_hosts_ports, timeout=self.zk_timeout) + # Connect to Zookeeper + zk_conn = KazooClient(zk_connect_str, timeout=self.zk_timeout) zk_conn.start() - try: - if consumer_groups is None: - # If consumer groups aren't specified, fetch them from ZK - consumer_groups = {consumer_group: None for consumer_group in - self._get_zk_path_children(zk_conn, zk_path_consumer, 'consumer groups')} - - for consumer_group, topics in consumer_groups.iteritems(): - if topics is None: - # If topics are't specified, fetch them from ZK - zk_path_topics = zk_path_topic_tmpl.format(group=consumer_group) - topics = {topic: None for topic in - self._get_zk_path_children(zk_conn, zk_path_topics, 'topics')} - for topic, partitions in topics.iteritems(): - if partitions is not None: - partitions = set(partitions) # defend against bad user input - else: - # If partitions aren't specified, fetch them from ZK - zk_path_partitions = zk_path_partition_tmpl.format( - group=consumer_group, topic=topic) - # Zookeeper returns the partition IDs as strings because - # they are extracted from the node path - partitions = [int(x) for x in self._get_zk_path_children( - zk_conn, zk_path_partitions, 'partitions')] - - # Fetch consumer offsets for each partition from ZK + try: + # Query Zookeeper for consumer offsets + consumer_offsets = {} + topics = defaultdict(set) + for consumer_group, topic_partitions in consumer_groups.iteritems(): + for topic, partitions in topic_partitions.iteritems(): + # Remember the topic partitions that we've see so that we can + # look up their broker offsets later + topics[topic].update(set(partitions)) for partition in partitions: - zk_path = (zk_path_partition_tmpl + '{partition}/').format( - group=consumer_group, topic=topic, partition=partition) + zk_path = zk_path_tmpl % (consumer_group, topic, partition) try: consumer_offset = int(zk_conn.get(zk_path)[0]) key = (consumer_group, topic, partition) - zk_consumer_offsets[key] = consumer_offset + consumer_offsets[key] = consumer_offset except NoNodeError: - self.log.info('No zookeeper node at %s', zk_path) + self.log.warn('No zookeeper node at %s' % zk_path) except Exception: - self.log.exception('Could not read consumer offset from %s', zk_path) + self.log.exception('Could not read consumer offset from %s' % zk_path) finally: try: zk_conn.stop() zk_conn.close() except Exception: self.log.exception('Error cleaning up Zookeeper connection') - return zk_consumer_offsets - def check(self, instance): - # For calculating lag, we have to fetch offsets from both kafka and - # zookeeper. There's a potential race condition because whichever one we - # check first may be outdated by the time we check the other. Better to - # check consumer offset before checking broker offset because worst case - # is that overstates consumer lag a little. Doing it the other way can - # understate consumer lag to the point of having negative consumer lag, - # which just creates confusion because it's theoretically impossible. - - # Fetch consumer group offsets from Zookeeper - zk_hosts_ports = self.read_config(instance, 'zk_connect_str') - zk_prefix = instance.get('zk_prefix', '') + # Connect to Kafka + kafka_conn = KafkaClient(kafka_host_ports, timeout=self.kafka_timeout) - # If monitor_unlisted_consumer_groups is True, fetch all groups stored in ZK - if instance.get('monitor_unlisted_consumer_groups', False): - consumer_groups = None - else: - consumer_groups = self.read_config(instance, 'consumer_groups', - cast=self._validate_consumer_groups) - - consumer_offsets = self._get_zk_consumer_offsets( - zk_hosts_ports, consumer_groups, zk_prefix) - - # Fetch the broker highwater offsets - kafka_hosts_ports = self.read_config(instance, 'kafka_connect_str') - highwater_offsets = self._get_highwater_offsets(kafka_hosts_ports) + try: + # Query Kafka for the broker offsets + broker_offsets = {} + for topic, partitions in topics.items(): + offset_responses = kafka_conn.send_offset_request([ + OffsetRequest(topic, p, -1, 1) for p in partitions]) + + for resp in offset_responses: + broker_offsets[(resp.topic, resp.partition)] = resp.offsets[0] + finally: + try: + kafka_conn.close() + except Exception: + self.log.exception('Error cleaning up Kafka connection') - # Report the broker highwater offset - for (topic, partition), highwater_offset in highwater_offsets.iteritems(): + # Report the broker data + for (topic, partition), broker_offset in broker_offsets.items(): broker_tags = ['topic:%s' % topic, 'partition:%s' % partition] - self.gauge('kafka.broker_offset', highwater_offset, tags=broker_tags) + broker_offset = broker_offsets.get((topic, partition)) + self.gauge('kafka.broker_offset', broker_offset, tags=broker_tags) - # Report the consumer group offsets and consumer lag - for (consumer_group, topic, partition), consumer_offset in consumer_offsets.iteritems(): - consumer_group_tags = ['topic:%s' % topic, 'partition:%s' % partition, - 'consumer_group:%s' % consumer_group] - self.gauge('kafka.consumer_offset', consumer_offset, tags=consumer_group_tags) - if (topic, partition) not in highwater_offsets: - self.log.warn("Consumer offsets exist for topic: {topic} " - "partition: {partition} but that topic partition doesn't " - "actually exist in the cluster.".format(**locals())) - continue - consumer_lag = highwater_offsets[(topic, partition)] - consumer_offset - if consumer_lag < 0: - # This is a really bad scenario because new messages produced to - # the topic are never consumed by that particular consumer - # group. So still report the negative lag as a way of increasing - # visibility of the error. - title = "Consumer lag for consumer negative." - message = "Consumer lag for consumer group: {group}, topic: {topic}, " \ - "partition: {partition} is negative. This should never happen.".format( - group=consumer_group, - topic=topic, - partition=partition - ) - key = "{}:{}:{}".format(consumer_group, topic, partition) - self._send_event(title, message, consumer_group_tags, 'consumer_lag', key) - self.log.debug(message) + # Report the consumer + for (consumer_group, topic, partition), consumer_offset in consumer_offsets.items(): - self.gauge('kafka.consumer_lag', consumer_lag, - tags=consumer_group_tags) + # Get the broker offset + broker_offset = broker_offsets.get((topic, partition)) + + # Report the consumer offset and lag + tags = ['topic:%s' % topic, 'partition:%s' % partition, + 'consumer_group:%s' % consumer_group] + self.gauge('kafka.consumer_offset', consumer_offset, tags=tags) + self.gauge('kafka.consumer_lag', broker_offset - consumer_offset, + tags=tags) # Private config validation/marshalling functions def _validate_consumer_groups(self, val): - # val = {'consumer_group': {'topic': [0, 1]}} try: - # consumer groups are optional - assert isinstance(val, dict) or val is None - if val is not None: - for consumer_group, topics in val.iteritems(): - assert isinstance(consumer_group, basestring) - # topics are optional - assert isinstance(topics, dict) or topics is None - if topics is not None: - for topic, partitions in topics.iteritems(): - assert isinstance(topic, basestring) - # partitions are optional - assert isinstance(partitions, (list, tuple)) or partitions is None - if partitions is not None: - for partition in partitions: - assert isinstance(partition, int) + consumer_group, topic_partitions = val.items()[0] + assert isinstance(consumer_group, (str, unicode)) + topic, partitions = topic_partitions.items()[0] + assert isinstance(topic, (str, unicode)) + assert isinstance(partitions, (list, tuple)) return val except Exception as e: self.log.exception(e) - raise Exception("""The `consumer_groups` value must be a mapping of mappings, like this: + raise Exception('''The `consumer_groups` value must be a mapping of mappings, like this: consumer_groups: myconsumer0: # consumer group name - mytopic0: [0, 1] # topic_name: list of partitions + mytopic0: [0, 1] # topic: list of partitions myconsumer1: mytopic0: [0, 1, 2] mytopic1: [10, 12] - myconsumer2: - mytopic0: - myconsumer3: - -Note that each level of values is optional. Any omitted values will be fetched from Zookeeper. -You can omit partitions (example: myconsumer2), topics (example: myconsumer3), and even consumer_groups. -If you omit consumer_groups, you must set the flag 'monitor_unlisted_consumer_groups': True. -If a value is omitted, the parent value must still be it's expected type (typically a dict). -""") - - def _send_event(self, title, text, tags, type, aggregation_key): - event_dict = { - 'timestamp': int(time.time()), - 'source_type_name': self.SOURCE_TYPE_NAME, - 'msg_title': title, - 'event_type': type, - 'msg_text': text, - 'tags': tags, - 'aggregation_key': aggregation_key, - } - - self.event(event_dict) +''') diff --git a/kafka_consumer/conf.yaml.example b/kafka_consumer/conf.yaml.example index 23e441cf10fbd..2dc68c090d5a8 100644 --- a/kafka_consumer/conf.yaml.example +++ b/kafka_consumer/conf.yaml.example @@ -18,10 +18,6 @@ instances: # consumer_groups: # my_consumer: # my_topic: [0, 1, 4, 12] - # # Setting monitor_unlisted_consumer_groups to True will tell the check to discover - # # and fetch all offsets for all consumer groups stored in zookeeper. While this is - # # often convenient, it can also put a lot of load on zookeeper, so use judiciously. - # monitor_unlisted_consumer_groups: False # Production example with redundant hosts: # In a production environment, it's often useful to specify multiple diff --git a/kafka_consumer/test_kafka_consumer.py b/kafka_consumer/test_kafka_consumer.py index 25d7ce195e278..c8fe7d601f025 100644 --- a/kafka_consumer/test_kafka_consumer.py +++ b/kafka_consumer/test_kafka_consumer.py @@ -3,10 +3,9 @@ # Licensed under Simplified BSD License (see LICENSE) # stdlib -import copy +from nose.plugins.attrib import attr # 3p -from nose.plugins.attrib import attr # project from tests.checks.common import AgentCheckTest @@ -46,19 +45,3 @@ def test_check(self): self.assertMetric(mname, at_least=1) self.coverage_report() - - - def test_check_nogroups(self): - """ - Testing Kafka_consumer check grabbing groups from ZK - """ - nogroup_instance = copy.copy(instance) - nogroup_instance[0].pop('consumer_groups') - nogroup_instance[0]['monitor_unlisted_consumer_groups'] = True - - self.run_check({'instances': instance}) - - for mname in METRICS: - self.assertMetric(mname, at_least=1) - - self.coverage_report()