Skip to content

Commit

Permalink
[kafka_consumer] be less smart about what we collect.
Browse files Browse the repository at this point in the history
  • Loading branch information
truthbk committed Oct 13, 2017
1 parent 06987ea commit aa9f015
Showing 1 changed file with 117 additions and 90 deletions.
207 changes: 117 additions & 90 deletions kafka_consumer/check.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,26 @@
import random
import time
from collections import defaultdict
from time import time, sleep

# 3p
from kafka.client import KafkaClient
from kafka.protocol.offset import OffsetRequest, OffsetResponse_v0
from kafka.protocol.offset import OffsetRequest, OffsetResetStrategy
from kafka.protocol.commit import GroupCoordinatorRequest, OffsetFetchRequest
from kafka.structs import TopicPartition
import kafka.errors as KafkaErrors
from kazoo.client import KazooClient
from kazoo.exceptions import NoNodeError

# project
from checks import AgentCheck
from config import _is_affirmative

# Kafka Errors
KAFKA_NO_ERROR = 0
KAFKA_UNKNOWN_ERROR = -1
KAFKA_UNKNOWN_TOPIC_OR_PARTITION = 3
KAFKA_NOT_LEADER_FOR_PARTITION = 6

DEFAULT_KAFKA_TIMEOUT = 5
DEFAULT_ZK_TIMEOUT = 5
Expand Down Expand Up @@ -69,14 +77,15 @@ def check(self, instance):
if instance.get('monitor_unlisted_consumer_groups', False):
consumer_groups = None
elif 'consumer_groups' in instance:
consumer_groups = self.read_config(instance, 'consumer_groups',
cast=self._validate_consumer_groups)
consumer_groups = self._read_config(instance, 'consumer_groups',
cast=self._validate_consumer_groups)

zk_consumer_offsets = None
if zk_hosts_ports:
zk_consumer_offsets, consumer_groups = self._get_zk_consumer_offsets(
zk_hosts_ports, consumer_groups, zk_prefix)

topics = defaultdict(set)
kafka_consumer_offsets = None
kafka_version = self._get_kafka_version(self._get_kafka_client(instance))
if collect_kafka_consumer_offsets:
Expand All @@ -85,45 +94,38 @@ def check(self, instance):
raise Exception('Invalid configuration - if you\'re not collecing '
'offset from ZK you _must_ specify consumer groups')
if self._kafka_compatible(kafka_version):
_, kafka_consumer_offsets = self._get_kafka_consumer_offsets(instance, consumer_groups)

kafka_consumer_offsets, topics = self._get_kafka_consumer_offsets(instance, consumer_groups)

if not topics:
# val = {'consumer_group': {'topic': [0, 1]}}
for _, topic_partitions in consumer_groups.iteritems():
for (topic, partitions), offset in topic_partitions.iteritems():
topics[topic].update(partitions)

warn_msg = """ Discovered %s partition contexts - this exceeds the maximum
number of contexts permitted by the check. Please narrow your
target by specifying in your YAML what consumer groups, topics
and partitions you wish to monitor."""
if zk_consumer_offsets and len(zk_consumer_offsets) > self.context_limit:
self.warning("Discovered %s partition contexts - this exceeds the maximum "
"number of contexts permitted by the check. Please narrow your "
"target by specifying in your YAML what consumer groups, topics "
"and partitions you wish to monitor." % len(zk_consumer_offsets))
self.warning(warn_msg % len(zk_consumer_offsets))
return
if kafka_consumer_offsets and len(kafka_consumer_offsets) > self.context_limit:
self.warning("Discovered %s partition contexts - this exceeds the maximum "
"number of contexts permitted by the check. Please narrow your "
"target by specifying in your YAML what consumer groups, topics "
"and partitions you wish to monitor." % len(kafka_consumer_offsets))
self.warning(warn_msg % len(kafka_consumer_offsets))
return

# Fetch the broker highwater offsets
highwater_offsets, topic_partitions_without_a_leader = self._get_broker_offsets(instance)
try:
highwater_offsets, topic_partitions_without_a_leader = self._get_broker_offsets(instance, topics)
except Exception:
self.log.exception('There was a problem collecting the high watermark offsets')
return


# Report the broker highwater offset
for (topic, partition), highwater_offset in highwater_offsets.iteritems():
broker_tags = ['topic:%s' % topic, 'partition:%s' % partition]
self.gauge('kafka.broker_offset', highwater_offset, tags=broker_tags)

# Report the consumer group offsets and consumer lag
# for (consumer_group, topic, partition), consumer_offset in consumer_offsets.iteritems():
# if (topic, partition) not in highwater_offsets:
# self.log.warn("[%s] topic: %s partition: %s was not available in the consumer "
# "- skipping consumer submission", consumer_group, topic, partition)
# continue

# consumer_group_tags = ['topic:%s' % topic, 'partition:%s' % partition,
# 'consumer_group:%s' % consumer_group]
# if (topic, partition) not in highwater_offsets:
# if (topic, partition) not in topic_partitions_without_a_leader:
# self.log.warn("Consumer group: %s has offsets for topic: %s "
# "partition: %s, but that topic partition doesn't actually "
# "exist in the cluster.", consumer_group, topic, partition)
# continue

# Report the consumer group offsets and consumer lag
if zk_consumer_offsets:
self._report_consumer_metrics(highwater_offsets, zk_consumer_offsets,
Expand All @@ -142,10 +144,10 @@ def stop(self):
cli.close()

def _get_instance_key(self, instance):
return self.read_config(instance, 'kafka_connect_str')
return self._read_config(instance, 'kafka_connect_str')

def _get_kafka_client(self, instance):
kafka_conn_str = self.read_config(instance, 'kafka_connect_str')
kafka_conn_str = self._read_config(instance, 'kafka_connect_str')
if not kafka_conn_str:
raise Exception('Bad instance')

Expand Down Expand Up @@ -179,10 +181,35 @@ def _make_req_async(self, client, request, nodeid=None, cb=None):
if cb:
future.add_callback(cb, request, nodeid, self.current_ts)

def _ensure_ready_node(self, client, nodeid):
if not nodeid:
raise Exception("nodeid is None")

attempts = 0
while not client.ready(nodeid):
if attempts > DEFAULT_KAFKA_RETRIES:
self.log.error("unable to connect to broker id: {} after {} attempts".format(nodeid, DEFAULT_KAFKA_RETRIES))
break
attempts = attempts + 1
delay = (2 ** attempts) + (random.randint(0, 1000) / 1000) * 0.01 # starting at 20 ms
self.log.info("broker id: %s is not ready yet, sleeping for %f ms", nodeid, delay * 10)

future = client.cluster.request_update()
client.poll(future=future)
if future.failed():
if future.retriable():
if isinstance(future.exception, KafkaErrors.NodeNotReadyError):
sleep(delay)
continue

raise future.exception

def _make_blocking_req(self, client, request, nodeid=None):
if not nodeid:
nodeid = self._get_random_node_id(client)

self._ensure_ready_node(client, nodeid)

future = client.send(nodeid, request)
client.poll(future=future) # block until we get response.
assert future.succeeded()
Expand Down Expand Up @@ -210,10 +237,11 @@ def _get_group_coordinator(self, client, group):
for broker in client.cluster.brokers():
try:
coord_resp = self._make_blocking_req(client, request, nodeid=broker.nodeId)
if coord_resp and coord_resp.error_code is not 0:
# 0 means that there is no error
if coord_resp and coord_resp.error_code is 0:
client.cluster.add_group_coordinator(group, coord_resp)
coord_id = client.cluster.coordinator_for_group(group)
if coord_id > 0:
if coord_id >= 0:
return coord_id
except AssertionError:
continue
Expand All @@ -230,18 +258,18 @@ def _process_highwater_offsets(self, request, instance, nodeid, response):
topic = tp[0]
partitions = tp[1]
for partition, error_code, offsets in partitions:
if error_code == 0:
if error_code == KAFKA_NO_ERROR:
highwater_offsets[(topic, partition)] = offsets[0]
# Valid error codes:
# https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-PossibleErrorCodes.2
elif error_code == -1:
elif error_code == KAFKA_UNKNOWN_ERROR:
self.log.error("Kafka broker returned UNKNOWN (error_code -1) for topic: %s, partition: %s. "
"This should never happen.", topic, partition)
elif error_code == 3:
elif error_code == KAFKA_UNKNOWN_TOPIC_OR_PARTITION:
self.log.warn("Kafka broker returned UNKNOWN_TOPIC_OR_PARTITION (error_code 3) for "
"topic: %s, partition: %s. This should only happen if the topic is currently being deleted.",
topic, partition)
elif error_code == 6:
elif error_code == KAFKA_NOT_LEADER_FOR_PARTITION:
self.log.warn("Kafka broker returned NOT_LEADER_FOR_PARTITION (error_code 6) for "
"topic: %s, partition: %s. This should only happen if the broker that was the partition "
"leader when kafka_client.cluster last fetched metadata is no longer the leader.",
Expand All @@ -250,7 +278,7 @@ def _process_highwater_offsets(self, request, instance, nodeid, response):

return highwater_offsets, topic_partitions_without_a_leader

def _get_broker_offsets(self, instance):
def _get_broker_offsets(self, instance, topics):
"""
Fetch highwater offsets for each topic/partition from Kafka cluster.
Expand All @@ -272,69 +300,59 @@ def _get_broker_offsets(self, instance):
# Connect to Kafka
highwater_offsets = {}
topic_partitions_without_a_leader = []
topics_to_fetch = defaultdict(set)
cli = self._get_kafka_client(instance)
try:
# store partitions that exist but unable to fetch offsets for later
# error checking
processed = []
pending = set([broker.nodeId for broker in cli.cluster.brokers()])
for _ in range(self._broker_retries):
if len(pending) == 0:
break

for node in processed:
pending.remove(node)

processed = []
for nodeId in pending:
if not cli.ready(nodeId):
self.log.debug('kafka broker (%s) unavailable this iteration - skipping', nodeId)
continue

# Group partitions by topic in order to construct the OffsetRequest
self.log.debug('kafka broker (%s) getting processed...', nodeId)
partitions_grouped_by_topic = defaultdict(list)
# partitions_for_broker returns all partitions for which this
# broker is leader. So any partitions that don't currently have
# leaders will be missed. Ignore as they'll be caught on next check run.
broker_partitions = cli.cluster.partitions_for_broker(nodeId)
if broker_partitions:
for topic, partition in broker_partitions:
partitions_grouped_by_topic[topic].append(partition)

# Construct the OffsetRequest
timestamp = -1 # -1 for latest, -2 for earliest
max_offsets = 1
request = OffsetRequest[0](
replica_id=-1,
topics=[
(topic, [
(partition, timestamp, max_offsets) for partition in partitions])
for topic, partitions in partitions_grouped_by_topic.iteritems()])

response = self._make_blocking_req(cli, request, nodeid=nodeId)
offsets, unled = self._process_highwater_offsets(request, instance, nodeId, response)
highwater_offsets.update(offsets)
topic_partitions_without_a_leader.extend(unled)

processed.append(nodeId)
except Exception:
self.log.exception('There was a problem collecting the high watermark offsets')
for topic, partitions in topics.iteritems():
# if no partitions are provided
# we're falling back to all available partitions (?)
if len(partitions) == 0:
partitions = cli.cluster.available_partitions_for_topic(topic)
topics_to_fetch[topic].update(partitions)


leader_tp = defaultdict(lambda: defaultdict(set))
for topic, partitions in topics_to_fetch.iteritems():
for partition in partitions:
partition_leader = cli.cluster.leader_for_partition(TopicPartition(topic, partition))
if partition_leader is not None and partition_leader > -1:
leader_tp[partition_leader][topic].update([partition])

partitions_grouped_by_topic = defaultdict(list)

max_offsets = 1
for nodeId, tps in leader_tp.iteritems():
# Construct the OffsetRequest
request = OffsetRequest[0](
replica_id=-1,
topics=[
(topic, [
(partition, OffsetResetStrategy.LATEST, max_offsets) for partition in partitions])
for topic, partitions in tps.iteritems()])

response = self._make_blocking_req(cli, request, nodeid=nodeId)
offsets, unled = self._process_highwater_offsets(request, instance, nodeId, response)
highwater_offsets.update(offsets)
topic_partitions_without_a_leader.extend(unled)

return highwater_offsets, list(set(topic_partitions_without_a_leader))

def _report_consumer_metrics(self, highwater_offsets, consumer_offsets, unled_topic_partitions=[], tags=[]):
for (consumer_group, topic, partition), consumer_offset in consumer_offsets.iteritems():
consumer_group_tags = ['topic:%s' % topic, 'partition:%s' % partition,
'consumer_group:%s' % consumer_group] + tags
# Report the consumer group offsets and consumer lag
if (topic, partition) not in highwater_offsets:
self.log.warn("[%s] topic: %s partition: %s was not available in the consumer "
"- skipping consumer submission", consumer_group, topic, partition)
if (topic, partition) not in unled_topic_partitions:
self.log.warn("Consumer group: %s has offsets for topic: %s "
"partition: %s, but that topic partition doesn't actually "
"exist in the cluster.", consumer_group, topic, partition)
continue

consumer_group_tags = ['topic:%s' % topic, 'partition:%s' % partition,
'consumer_group:%s' % consumer_group] + tags
self.gauge('kafka.consumer_offset', consumer_offset, tags=consumer_group_tags)

consumer_lag = highwater_offsets[(topic, partition)] - consumer_offset
if consumer_lag < 0:
# this will result in data loss, so emit an event for max visibility
Expand Down Expand Up @@ -435,6 +453,10 @@ def _get_zk_consumer_offsets(self, zk_hosts_ports, consumer_groups=None, zk_pref
return zk_consumer_offsets, consumer_groups

def _get_kafka_consumer_offsets(self, instance, consumer_groups):
"""
retrieve consumer offsets via the new consumer api. Offsets in this version are stored directly in kafka (__consumer_offsets topic)
rather than in zookeeper
"""
consumer_offsets = {}
topics = defaultdict(set)

Expand All @@ -456,13 +478,15 @@ def _get_kafka_consumer_offsets(self, instance, consumer_groups):
except Exception:
self.log.exception('Could not read consumer offsets from kafka.')

return topics, consumer_offsets
return consumer_offsets, topics

def _get_consumer_offsets(self, client, consumer_group, topic_partitions, coord_id=None):
# version = client.check_version(coord_id)

tps = defaultdict(set)
for topic, partitions in topic_partitions.iteritems():
if len(partitions) == 0:
partitions = client.cluster.available_partitions_for_topic(topic)
tps[topic] = tps[unicode(topic)].union(set(partitions))

# TODO: find reliable way to decide what API version to use for
Expand All @@ -481,12 +505,15 @@ def _get_consumer_offsets(self, client, consumer_group, topic_partitions, coord_
return consumer_offsets

@staticmethod
def _read_config(instance, key):
def _read_config(instance, key, cast=None):
val = instance.get(key)
if val is None:
raise Exception('Must provide `%s` value in instance config' % key)

return val
if cast is None:
return val

return cast(val)

def _validate_consumer_groups(self, val):
# val = {'consumer_group': {'topic': [0, 1]}}
Expand Down

0 comments on commit aa9f015

Please sign in to comment.