Skip to content

Commit

Permalink
Support Python 3 (#2648)
Browse files Browse the repository at this point in the history
  • Loading branch information
ofek authored Nov 27, 2018
1 parent 16afef3 commit 2cbd376
Show file tree
Hide file tree
Showing 10 changed files with 281 additions and 434 deletions.
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ jobs:
- stage: test
env: CHECK=istio
- stage: test
env: CHECK=kafka_consumer
env: CHECK=kafka_consumer PYTHON3=true
- stage: test
env: CHECK=kong
- stage: test
Expand Down
108 changes: 48 additions & 60 deletions kafka_consumer/datadog_checks/kafka_consumer/kafka_consumer.py
Original file line number Diff line number Diff line change
@@ -1,30 +1,26 @@
# (C) Datadog, Inc. 2010-2017
# (C) Datadog, Inc. 2018
# All rights reserved
# Licensed under Simplified BSD License (see LICENSE)

# stdlib
from collections import defaultdict
import random
from collections import defaultdict
from time import time, sleep

# 3p
from kafka import errors as kafka_errors
from kafka.client import KafkaClient
import kafka.errors as KafkaErrors
from kafka.structs import TopicPartition
from kafka.protocol.commit import GroupCoordinatorRequest, OffsetFetchRequest
from kafka.protocol.offset import OffsetRequest, OffsetResetStrategy
from kafka.structs import TopicPartition
from kazoo.client import KazooClient
from kazoo.exceptions import NoNodeError
from six import iteritems, itervalues, string_types, text_type

# project
from datadog_checks.checks import AgentCheck
from datadog_checks.config import _is_affirmative
from datadog_checks.base import AgentCheck, is_affirmative

# Kafka Errors
KAFKA_NO_ERROR = KafkaErrors.NoError.errno
KAFKA_UNKNOWN_ERROR = KafkaErrors.UnknownError.errno
KAFKA_UNKNOWN_TOPIC_OR_PARTITION = KafkaErrors.UnknownTopicOrPartitionError.errno
KAFKA_NOT_LEADER_FOR_PARTITION = KafkaErrors.NotLeaderForPartitionError.errno
KAFKA_NO_ERROR = kafka_errors.NoError.errno
KAFKA_UNKNOWN_ERROR = kafka_errors.UnknownError.errno
KAFKA_UNKNOWN_TOPIC_OR_PARTITION = kafka_errors.UnknownTopicOrPartitionError.errno
KAFKA_NOT_LEADER_FOR_PARTITION = kafka_errors.NotLeaderForPartitionError.errno

DEFAULT_KAFKA_TIMEOUT = 5
DEFAULT_ZK_TIMEOUT = 5
Expand All @@ -48,14 +44,10 @@ class KafkaCheck(AgentCheck):

def __init__(self, name, init_config, agentConfig, instances=None):
AgentCheck.__init__(self, name, init_config, agentConfig, instances=instances)
self._zk_timeout = int(
init_config.get('zk_timeout', DEFAULT_ZK_TIMEOUT))
self._kafka_timeout = int(
init_config.get('kafka_timeout', DEFAULT_KAFKA_TIMEOUT))
self.context_limit = int(
init_config.get('max_partition_contexts', CONTEXT_UPPER_BOUND))
self._broker_retries = int(
init_config.get('kafka_retries', DEFAULT_KAFKA_RETRIES))
self._zk_timeout = int(init_config.get('zk_timeout', DEFAULT_ZK_TIMEOUT))
self._kafka_timeout = int(init_config.get('kafka_timeout', DEFAULT_KAFKA_TIMEOUT))
self.context_limit = int(init_config.get('max_partition_contexts', CONTEXT_UPPER_BOUND))
self._broker_retries = int(init_config.get('kafka_retries', DEFAULT_KAFKA_RETRIES))
self._zk_last_ts = {}

self.kafka_clients = {}
Expand All @@ -73,8 +65,7 @@ def check(self, instance):
zk_hosts_ports = instance.get('zk_connect_str')
zk_prefix = instance.get('zk_prefix', '')
zk_interval = int(instance.get('zk_iteration_ival', 0))
get_kafka_consumer_offsets = _is_affirmative(
instance.get('kafka_consumer_offsets', zk_hosts_ports is None))
get_kafka_consumer_offsets = is_affirmative(instance.get('kafka_consumer_offsets', zk_hosts_ports is None))

custom_tags = instance.get('tags', [])

Expand All @@ -87,8 +78,7 @@ def check(self, instance):
self._validate_explicit_consumer_groups(consumer_groups)

zk_consumer_offsets = None
if zk_hosts_ports and \
self._should_zk(zk_hosts_ports, zk_interval, get_kafka_consumer_offsets):
if zk_hosts_ports and self._should_zk(zk_hosts_ports, zk_interval, get_kafka_consumer_offsets):
zk_consumer_offsets, consumer_groups = self._get_zk_consumer_offsets(
zk_hosts_ports, consumer_groups, zk_prefix)

Expand All @@ -101,7 +91,7 @@ def check(self, instance):
if get_kafka_consumer_offsets:
# For now, consumer groups are mandatory if not using ZK
if not zk_hosts_ports and not consumer_groups:
raise BadKafkaConsumerConfiguration('Invalid configuration - if you\'re not collecting '
raise BadKafkaConsumerConfiguration('Invalid configuration - if you are not collecting '
'offsets from ZK you _must_ specify consumer groups')
# kafka-python automatically probes the cluster for broker version
# and then stores it. Note that this returns the first version
Expand All @@ -114,8 +104,8 @@ def check(self, instance):

if not topics:
# val = {'consumer_group': {'topic': [0, 1]}}
for _, tps in consumer_groups.iteritems():
for topic, partitions in tps.iteritems():
for _, tps in iteritems(consumer_groups):
for topic, partitions in iteritems(tps):
topics[topic].update(partitions)

warn_msg = """ Discovered %s partition contexts - this exceeds the maximum
Expand All @@ -137,7 +127,7 @@ def check(self, instance):
return

# Report the broker highwater offset
for (topic, partition), highwater_offset in highwater_offsets.iteritems():
for (topic, partition), highwater_offset in iteritems(highwater_offsets):
broker_tags = ['topic:%s' % topic, 'partition:%s' % partition] + custom_tags
self.gauge('kafka.broker_offset', highwater_offset, tags=broker_tags)

Expand All @@ -154,14 +144,13 @@ def stop(self):
cleanup kafka connections (to all brokers) to avoid leaving
stale connections in older kafkas.
"""
for cli in self.kafka_clients.itervalues():
for cli in itervalues(self.kafka_clients):
cli.close()

def _get_kafka_client(self, instance):
kafka_conn_str = instance.get('kafka_connect_str')
if not isinstance(kafka_conn_str, (basestring, list)):
raise BadKafkaConsumerConfiguration(
'kafka_connect_str should be string or list of strings')
if not isinstance(kafka_conn_str, (string_types, list)):
raise BadKafkaConsumerConfiguration('kafka_connect_str should be string or list of strings')

instance_key = tuple(kafka_conn_str) # cast to tuple in case it's a list
if instance_key not in self.kafka_clients:
Expand Down Expand Up @@ -196,7 +185,7 @@ def _ensure_ready_node(self, client, node_id):
client.poll(future=future)
if future.failed():
if future.retriable():
if isinstance(future.exception, KafkaErrors.NodeNotReadyError):
if isinstance(future.exception, kafka_errors.NodeNotReadyError):
self.log.debug("broker id: %s is not ready yet, sleeping for %f ms", node_id, delay * 10)
sleep(delay)
continue
Expand Down Expand Up @@ -238,7 +227,7 @@ def _get_group_coordinator(self, client, group):

return coord_id

def _process_highwater_offsets(self, request, instance, node_id, response):
def _process_highwater_offsets(self, response):
highwater_offsets = {}
topic_partitions_without_a_leader = []

Expand Down Expand Up @@ -292,32 +281,31 @@ def _get_broker_offsets(self, instance, topics):
topics_to_fetch = defaultdict(set)
cli = self._get_kafka_client(instance)

for topic, partitions in topics.iteritems():
for topic, partitions in iteritems(topics):
# if no partitions are provided
# we're falling back to all available partitions (?)
if len(partitions) == 0:
partitions = cli.cluster.available_partitions_for_topic(topic)
topics_to_fetch[topic].update(partitions)

leader_tp = defaultdict(lambda: defaultdict(set))
for topic, partitions in topics_to_fetch.iteritems():
for topic, partitions in iteritems(topics_to_fetch):
for partition in partitions:
partition_leader = cli.cluster.leader_for_partition(TopicPartition(topic, partition))
if partition_leader is not None and partition_leader >= 0:
leader_tp[partition_leader][topic].update([partition])
leader_tp[partition_leader][topic].add(partition)

max_offsets = 1
for node_id, tps in leader_tp.iteritems():
for node_id, tps in iteritems(leader_tp):
# Construct the OffsetRequest
request = OffsetRequest[0](
replica_id=-1,
topics=[
(topic, [
(partition, OffsetResetStrategy.LATEST, max_offsets) for partition in partitions])
for topic, partitions in tps.iteritems()])
(topic, [(partition, OffsetResetStrategy.LATEST, max_offsets) for partition in partitions])
for topic, partitions in iteritems(tps)])

response = self._make_blocking_req(cli, request, node_id=node_id)
offsets, unled = self._process_highwater_offsets(request, instance, node_id, response)
offsets, unled = self._process_highwater_offsets(response)
highwater_offsets.update(offsets)
topic_partitions_without_a_leader.extend(unled)

Expand All @@ -328,7 +316,7 @@ def _report_consumer_metrics(self, highwater_offsets, consumer_offsets, unled_to
unled_topic_partitions = []
if tags is None:
tags = []
for (consumer_group, topic, partition), consumer_offset in consumer_offsets.iteritems():
for (consumer_group, topic, partition), consumer_offset in iteritems(consumer_offsets):
# Report the consumer group offsets and consumer lag
if (topic, partition) not in highwater_offsets:
self.log.warn("[%s] topic: %s partition: %s was not available in the consumer "
Expand Down Expand Up @@ -399,21 +387,20 @@ def _get_zk_consumer_offsets(self, zk_hosts_ports, consumer_groups=None, zk_pref
consumer_groups = {consumer_group: None for consumer_group in
self._get_zk_path_children(zk_conn, zk_path_consumer, 'consumer groups')}

for consumer_group, topics in consumer_groups.iteritems():
for consumer_group, topics in iteritems(consumer_groups):
if topics is None:
# If topics are't specified, fetch them from ZK
zk_path_topics = zk_path_topic_tmpl.format(group=consumer_group)
topics = {topic: None for topic in
self._get_zk_path_children(zk_conn, zk_path_topics, 'topics')}
consumer_groups[consumer_group] = topics

for topic, partitions in topics.iteritems():
for topic, partitions in iteritems(topics):
if partitions is not None:
partitions = set(partitions) # defend against bad user input
else:
# If partitions aren't specified, fetch them from ZK
zk_path_partitions = zk_path_partition_tmpl.format(
group=consumer_group, topic=topic)
zk_path_partitions = zk_path_partition_tmpl.format(group=consumer_group, topic=topic)
# Zookeeper returns the partition IDs as strings because
# they are extracted from the node path
partitions = [int(x) for x in self._get_zk_path_children(
Expand Down Expand Up @@ -450,7 +437,7 @@ def _get_kafka_consumer_offsets(self, instance, consumer_groups):

cli = self._get_kafka_client(instance)

for consumer_group, topic_partitions in consumer_groups.iteritems():
for consumer_group, topic_partitions in iteritems(consumer_groups):
try:
coordinator_id = self._get_group_coordinator(cli, consumer_group)
if coordinator_id:
Expand All @@ -459,7 +446,7 @@ def _get_kafka_consumer_offsets(self, instance, consumer_groups):
offsets = self._get_consumer_offsets(cli, consumer_group, topic_partitions)
self.log.info("unable to find group coordinator for %s", consumer_group)

for (topic, partition), offset in offsets.iteritems():
for (topic, partition), offset in iteritems(offsets):
topics[topic].update([partition])
key = (consumer_group, topic, partition)
consumer_offsets[key] = offset
Expand All @@ -470,10 +457,10 @@ def _get_kafka_consumer_offsets(self, instance, consumer_groups):

def _get_consumer_offsets(self, client, consumer_group, topic_partitions, coord_id=None):
tps = defaultdict(set)
for topic, partitions in topic_partitions.iteritems():
for topic, partitions in iteritems(topic_partitions):
if len(partitions) == 0:
partitions = client.cluster.available_partitions_for_topic(topic)
tps[topic] = tps[unicode(topic)].union(set(partitions))
tps[topic] = tps[text_type(topic)].union(set(partitions))

consumer_offsets = {}
if coord_id is not None and coord_id >= 0:
Expand All @@ -485,7 +472,7 @@ def _get_consumer_offsets(self, client, consumer_group, topic_partitions, coord_
# Kafka protocol uses OffsetFetchRequests to retrieve consumer offsets:
# https://kafka.apache.org/protocol#The_Messages_OffsetFetch
# https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetFetchRequest
request = OffsetFetchRequest[1](consumer_group, list(tps.iteritems()))
request = OffsetFetchRequest[1](consumer_group, list(iteritems(tps)))
response = self._make_blocking_req(client, request, node_id=broker_id)
for (topic, partition_offsets) in response.topics:
for partition, offset, _, error_code in partition_offsets:
Expand All @@ -503,13 +490,14 @@ def _should_zk(self, zk_hosts_ports, interval, kafka_collect=False):
last = self._zk_last_ts.get(zk_hosts_ports, 0)

should_zk = False
if (now-last) >= interval:
if now - last >= interval:
self._zk_last_ts[zk_hosts_ports] = last
should_zk = True

return should_zk

def _validate_explicit_consumer_groups(self, val):
@classmethod
def _validate_explicit_consumer_groups(cls, val):
"""Validate any explicitly specified consumer groups.
While the check does not require specifying consumer groups,
Expand All @@ -518,13 +506,13 @@ def _validate_explicit_consumer_groups(self, val):
val = {'consumer_group': {'topic': [0, 1]}}
"""
assert isinstance(val, dict)
for consumer_group, topics in val.iteritems():
assert isinstance(consumer_group, basestring)
for consumer_group, topics in iteritems(val):
assert isinstance(consumer_group, string_types)
# topics are optional
assert isinstance(topics, dict) or topics is None
if topics is not None:
for topic, partitions in topics.iteritems():
assert isinstance(topic, basestring)
for topic, partitions in iteritems(topics):
assert isinstance(topic, string_types)
# partitions are optional
assert isinstance(partitions, (list, tuple)) or partitions is None
if partitions is not None:
Expand Down
2 changes: 1 addition & 1 deletion kafka_consumer/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ def get_requirements(fpath):
return f.readlines()


CHECKS_BASE_REQ = 'datadog_checks_base'
CHECKS_BASE_REQ = 'datadog-checks-base>=4.2.0'

setup(
name='datadog-kafka_consumer',
Expand Down
29 changes: 6 additions & 23 deletions kafka_consumer/tests/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,43 +3,26 @@
# Licensed under Simplified BSD License (see LICENSE)
import os
import socket
from distutils.version import LooseVersion

from datadog_checks.kafka_consumer import KafkaCheck
from datadog_checks.utils.common import get_docker_hostname
from datadog_checks.dev import get_docker_hostname

HERE = os.path.dirname(os.path.abspath(__file__))
HOST = get_docker_hostname()
HOST_IP = socket.gethostbyname(HOST)
LAST_ZKONLY_VERSION = (0, 8, 1, 1)
KAFKA_LEGACY = LooseVersion('0.8.2.0')
KAFKA_CONNECT_STR = '{}:9092'.format(HOST_IP)
ZK_CONNECT_STR = '{}:2181'.format(HOST)
TOPICS = ['marvel', 'dc', '__consumer_offsets']
PARTITIONS = [0, 1]


def is_supported(flavors):
def is_supported(flavor):
"""
Returns whether the current CI configuration is supported
"""
supported = False
version = os.environ.get('KAFKA_VERSION')
flavor = os.environ.get('KAFKA_OFFSETS_STORAGE', '').lower()

if not version:
if not os.environ.get('KAFKA_VERSION'):
return False

for f in flavors:
if f == flavor:
supported = True

if not supported:
if flavor != os.environ.get('KAFKA_OFFSETS_STORAGE'):
return False

if version is not 'latest':
version = version.split('-')[0]
version = tuple(s for s in version.split('.') if s.strip())
if flavor is 'kafka' and version <= KafkaCheck.LAST_ZKONLY_VERSION:
supported = False

return supported
return True
Loading

0 comments on commit 2cbd376

Please sign in to comment.