diff --git a/kafka_consumer/check.py b/kafka_consumer/check.py index 872b3745b2166..26d99d103c44d 100644 --- a/kafka_consumer/check.py +++ b/kafka_consumer/check.py @@ -6,11 +6,14 @@ import random import time from collections import defaultdict +from time import time, sleep # 3p from kafka.client import KafkaClient -from kafka.protocol.offset import OffsetRequest, OffsetResponse_v0 +from kafka.protocol.offset import OffsetRequest, OffsetResetStrategy from kafka.protocol.commit import GroupCoordinatorRequest, OffsetFetchRequest +from kafka.structs import TopicPartition +import kafka.errors as KafkaErrors from kazoo.client import KazooClient from kazoo.exceptions import NoNodeError @@ -18,6 +21,11 @@ from checks import AgentCheck from config import _is_affirmative +# Kafka Errors +KAFKA_NO_ERROR = 0 +KAFKA_UNKNOWN_ERROR = -1 +KAFKA_UNKNOWN_TOPIC_OR_PARTITION = 3 +KAFKA_NOT_LEADER_FOR_PARTITION = 6 DEFAULT_KAFKA_TIMEOUT = 5 DEFAULT_ZK_TIMEOUT = 5 @@ -69,14 +77,15 @@ def check(self, instance): if instance.get('monitor_unlisted_consumer_groups', False): consumer_groups = None elif 'consumer_groups' in instance: - consumer_groups = self.read_config(instance, 'consumer_groups', - cast=self._validate_consumer_groups) + consumer_groups = self._read_config(instance, 'consumer_groups', + cast=self._validate_consumer_groups) zk_consumer_offsets = None if zk_hosts_ports: zk_consumer_offsets, consumer_groups = self._get_zk_consumer_offsets( zk_hosts_ports, consumer_groups, zk_prefix) + topics = defaultdict(set) kafka_consumer_offsets = None kafka_version = self._get_kafka_version(self._get_kafka_client(instance)) if collect_kafka_consumer_offsets: @@ -85,45 +94,38 @@ def check(self, instance): raise Exception('Invalid configuration - if you\'re not collecing ' 'offset from ZK you _must_ specify consumer groups') if self._kafka_compatible(kafka_version): - _, kafka_consumer_offsets = self._get_kafka_consumer_offsets(instance, consumer_groups) - + kafka_consumer_offsets, topics = self._get_kafka_consumer_offsets(instance, consumer_groups) + + if not topics: + # val = {'consumer_group': {'topic': [0, 1]}} + for _, topic_partitions in consumer_groups.iteritems(): + for (topic, partitions), offset in topic_partitions.iteritems(): + topics[topic].update(partitions) + + warn_msg = """ Discovered %s partition contexts - this exceeds the maximum + number of contexts permitted by the check. Please narrow your + target by specifying in your YAML what consumer groups, topics + and partitions you wish to monitor.""" if zk_consumer_offsets and len(zk_consumer_offsets) > self.context_limit: - self.warning("Discovered %s partition contexts - this exceeds the maximum " - "number of contexts permitted by the check. Please narrow your " - "target by specifying in your YAML what consumer groups, topics " - "and partitions you wish to monitor." % len(zk_consumer_offsets)) + self.warning(warn_msg % len(zk_consumer_offsets)) return if kafka_consumer_offsets and len(kafka_consumer_offsets) > self.context_limit: - self.warning("Discovered %s partition contexts - this exceeds the maximum " - "number of contexts permitted by the check. Please narrow your " - "target by specifying in your YAML what consumer groups, topics " - "and partitions you wish to monitor." % len(kafka_consumer_offsets)) + self.warning(warn_msg % len(kafka_consumer_offsets)) return # Fetch the broker highwater offsets - highwater_offsets, topic_partitions_without_a_leader = self._get_broker_offsets(instance) + try: + highwater_offsets, topic_partitions_without_a_leader = self._get_broker_offsets(instance, topics) + except Exception: + self.log.exception('There was a problem collecting the high watermark offsets') + return + # Report the broker highwater offset for (topic, partition), highwater_offset in highwater_offsets.iteritems(): broker_tags = ['topic:%s' % topic, 'partition:%s' % partition] self.gauge('kafka.broker_offset', highwater_offset, tags=broker_tags) - # Report the consumer group offsets and consumer lag - # for (consumer_group, topic, partition), consumer_offset in consumer_offsets.iteritems(): - # if (topic, partition) not in highwater_offsets: - # self.log.warn("[%s] topic: %s partition: %s was not available in the consumer " - # "- skipping consumer submission", consumer_group, topic, partition) - # continue - - # consumer_group_tags = ['topic:%s' % topic, 'partition:%s' % partition, - # 'consumer_group:%s' % consumer_group] - # if (topic, partition) not in highwater_offsets: - # if (topic, partition) not in topic_partitions_without_a_leader: - # self.log.warn("Consumer group: %s has offsets for topic: %s " - # "partition: %s, but that topic partition doesn't actually " - # "exist in the cluster.", consumer_group, topic, partition) - # continue - # Report the consumer group offsets and consumer lag if zk_consumer_offsets: self._report_consumer_metrics(highwater_offsets, zk_consumer_offsets, @@ -142,10 +144,10 @@ def stop(self): cli.close() def _get_instance_key(self, instance): - return self.read_config(instance, 'kafka_connect_str') + return self._read_config(instance, 'kafka_connect_str') def _get_kafka_client(self, instance): - kafka_conn_str = self.read_config(instance, 'kafka_connect_str') + kafka_conn_str = self._read_config(instance, 'kafka_connect_str') if not kafka_conn_str: raise Exception('Bad instance') @@ -179,10 +181,35 @@ def _make_req_async(self, client, request, nodeid=None, cb=None): if cb: future.add_callback(cb, request, nodeid, self.current_ts) + def _ensure_ready_node(self, client, nodeid): + if not nodeid: + raise Exception("nodeid is None") + + attempts = 0 + while not client.ready(nodeid): + if attempts > DEFAULT_KAFKA_RETRIES: + self.log.error("unable to connect to broker id: {} after {} attempts".format(nodeid, DEFAULT_KAFKA_RETRIES)) + break + attempts = attempts + 1 + delay = (2 ** attempts) + (random.randint(0, 1000) / 1000) * 0.01 # starting at 20 ms + self.log.info("broker id: %s is not ready yet, sleeping for %f ms", nodeid, delay * 10) + + future = client.cluster.request_update() + client.poll(future=future) + if future.failed(): + if future.retriable(): + if isinstance(future.exception, KafkaErrors.NodeNotReadyError): + sleep(delay) + continue + + raise future.exception + def _make_blocking_req(self, client, request, nodeid=None): if not nodeid: nodeid = self._get_random_node_id(client) + self._ensure_ready_node(client, nodeid) + future = client.send(nodeid, request) client.poll(future=future) # block until we get response. assert future.succeeded() @@ -210,10 +237,11 @@ def _get_group_coordinator(self, client, group): for broker in client.cluster.brokers(): try: coord_resp = self._make_blocking_req(client, request, nodeid=broker.nodeId) - if coord_resp and coord_resp.error_code is not 0: + # 0 means that there is no error + if coord_resp and coord_resp.error_code is 0: client.cluster.add_group_coordinator(group, coord_resp) coord_id = client.cluster.coordinator_for_group(group) - if coord_id > 0: + if coord_id >= 0: return coord_id except AssertionError: continue @@ -230,18 +258,18 @@ def _process_highwater_offsets(self, request, instance, nodeid, response): topic = tp[0] partitions = tp[1] for partition, error_code, offsets in partitions: - if error_code == 0: + if error_code == KAFKA_NO_ERROR: highwater_offsets[(topic, partition)] = offsets[0] # Valid error codes: # https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-PossibleErrorCodes.2 - elif error_code == -1: + elif error_code == KAFKA_UNKNOWN_ERROR: self.log.error("Kafka broker returned UNKNOWN (error_code -1) for topic: %s, partition: %s. " "This should never happen.", topic, partition) - elif error_code == 3: + elif error_code == KAFKA_UNKNOWN_TOPIC_OR_PARTITION: self.log.warn("Kafka broker returned UNKNOWN_TOPIC_OR_PARTITION (error_code 3) for " "topic: %s, partition: %s. This should only happen if the topic is currently being deleted.", topic, partition) - elif error_code == 6: + elif error_code == KAFKA_NOT_LEADER_FOR_PARTITION: self.log.warn("Kafka broker returned NOT_LEADER_FOR_PARTITION (error_code 6) for " "topic: %s, partition: %s. This should only happen if the broker that was the partition " "leader when kafka_client.cluster last fetched metadata is no longer the leader.", @@ -250,7 +278,7 @@ def _process_highwater_offsets(self, request, instance, nodeid, response): return highwater_offsets, topic_partitions_without_a_leader - def _get_broker_offsets(self, instance): + def _get_broker_offsets(self, instance, topics): """ Fetch highwater offsets for each topic/partition from Kafka cluster. @@ -272,69 +300,59 @@ def _get_broker_offsets(self, instance): # Connect to Kafka highwater_offsets = {} topic_partitions_without_a_leader = [] + topics_to_fetch = defaultdict(set) cli = self._get_kafka_client(instance) - try: - # store partitions that exist but unable to fetch offsets for later - # error checking - processed = [] - pending = set([broker.nodeId for broker in cli.cluster.brokers()]) - for _ in range(self._broker_retries): - if len(pending) == 0: - break - - for node in processed: - pending.remove(node) - - processed = [] - for nodeId in pending: - if not cli.ready(nodeId): - self.log.debug('kafka broker (%s) unavailable this iteration - skipping', nodeId) - continue - # Group partitions by topic in order to construct the OffsetRequest - self.log.debug('kafka broker (%s) getting processed...', nodeId) - partitions_grouped_by_topic = defaultdict(list) - # partitions_for_broker returns all partitions for which this - # broker is leader. So any partitions that don't currently have - # leaders will be missed. Ignore as they'll be caught on next check run. - broker_partitions = cli.cluster.partitions_for_broker(nodeId) - if broker_partitions: - for topic, partition in broker_partitions: - partitions_grouped_by_topic[topic].append(partition) - - # Construct the OffsetRequest - timestamp = -1 # -1 for latest, -2 for earliest - max_offsets = 1 - request = OffsetRequest[0]( - replica_id=-1, - topics=[ - (topic, [ - (partition, timestamp, max_offsets) for partition in partitions]) - for topic, partitions in partitions_grouped_by_topic.iteritems()]) - - response = self._make_blocking_req(cli, request, nodeid=nodeId) - offsets, unled = self._process_highwater_offsets(request, instance, nodeId, response) - highwater_offsets.update(offsets) - topic_partitions_without_a_leader.extend(unled) - - processed.append(nodeId) - except Exception: - self.log.exception('There was a problem collecting the high watermark offsets') + for topic, partitions in topics.iteritems(): + # if no partitions are provided + # we're falling back to all available partitions (?) + if len(partitions) == 0: + partitions = cli.cluster.available_partitions_for_topic(topic) + topics_to_fetch[topic].update(partitions) + + + leader_tp = defaultdict(lambda: defaultdict(set)) + for topic, partitions in topics_to_fetch.iteritems(): + for partition in partitions: + partition_leader = cli.cluster.leader_for_partition(TopicPartition(topic, partition)) + if partition_leader is not None and partition_leader > -1: + leader_tp[partition_leader][topic].update([partition]) + + partitions_grouped_by_topic = defaultdict(list) + + max_offsets = 1 + for nodeId, tps in leader_tp.iteritems(): + # Construct the OffsetRequest + request = OffsetRequest[0]( + replica_id=-1, + topics=[ + (topic, [ + (partition, OffsetResetStrategy.LATEST, max_offsets) for partition in partitions]) + for topic, partitions in tps.iteritems()]) + + response = self._make_blocking_req(cli, request, nodeid=nodeId) + offsets, unled = self._process_highwater_offsets(request, instance, nodeId, response) + highwater_offsets.update(offsets) + topic_partitions_without_a_leader.extend(unled) return highwater_offsets, list(set(topic_partitions_without_a_leader)) def _report_consumer_metrics(self, highwater_offsets, consumer_offsets, unled_topic_partitions=[], tags=[]): for (consumer_group, topic, partition), consumer_offset in consumer_offsets.iteritems(): - consumer_group_tags = ['topic:%s' % topic, 'partition:%s' % partition, - 'consumer_group:%s' % consumer_group] + tags + # Report the consumer group offsets and consumer lag if (topic, partition) not in highwater_offsets: + self.log.warn("[%s] topic: %s partition: %s was not available in the consumer " + "- skipping consumer submission", consumer_group, topic, partition) if (topic, partition) not in unled_topic_partitions: self.log.warn("Consumer group: %s has offsets for topic: %s " "partition: %s, but that topic partition doesn't actually " "exist in the cluster.", consumer_group, topic, partition) continue + consumer_group_tags = ['topic:%s' % topic, 'partition:%s' % partition, + 'consumer_group:%s' % consumer_group] + tags self.gauge('kafka.consumer_offset', consumer_offset, tags=consumer_group_tags) + consumer_lag = highwater_offsets[(topic, partition)] - consumer_offset if consumer_lag < 0: # this will result in data loss, so emit an event for max visibility @@ -435,6 +453,10 @@ def _get_zk_consumer_offsets(self, zk_hosts_ports, consumer_groups=None, zk_pref return zk_consumer_offsets, consumer_groups def _get_kafka_consumer_offsets(self, instance, consumer_groups): + """ + retrieve consumer offsets via the new consumer api. Offsets in this version are stored directly in kafka (__consumer_offsets topic) + rather than in zookeeper + """ consumer_offsets = {} topics = defaultdict(set) @@ -456,13 +478,15 @@ def _get_kafka_consumer_offsets(self, instance, consumer_groups): except Exception: self.log.exception('Could not read consumer offsets from kafka.') - return topics, consumer_offsets + return consumer_offsets, topics def _get_consumer_offsets(self, client, consumer_group, topic_partitions, coord_id=None): # version = client.check_version(coord_id) tps = defaultdict(set) for topic, partitions in topic_partitions.iteritems(): + if len(partitions) == 0: + partitions = client.cluster.available_partitions_for_topic(topic) tps[topic] = tps[unicode(topic)].union(set(partitions)) # TODO: find reliable way to decide what API version to use for @@ -481,12 +505,15 @@ def _get_consumer_offsets(self, client, consumer_group, topic_partitions, coord_ return consumer_offsets @staticmethod - def _read_config(instance, key): + def _read_config(instance, key, cast=None): val = instance.get(key) if val is None: raise Exception('Must provide `%s` value in instance config' % key) - return val + if cast is None: + return val + + return cast(val) def _validate_consumer_groups(self, val): # val = {'consumer_group': {'topic': [0, 1]}}