From 0dc4d3f3aab4b25b448aedfa18704cf7d6e9e8b7 Mon Sep 17 00:00:00 2001 From: Ofek Lev Date: Mon, 26 Nov 2018 22:11:53 -0500 Subject: [PATCH] Support Python 3 --- .travis.yml | 2 +- .../kafka_consumer/kafka_consumer.py | 108 ++++---- kafka_consumer/setup.py | 2 +- kafka_consumer/tests/common.py | 29 +- kafka_consumer/tests/conftest.py | 248 ++++-------------- .../docker-compose.yaml} | 0 kafka_consumer/tests/runners.py | 129 +++++++++ kafka_consumer/tests/test_kafka_consumer.py | 29 +- .../tests/test_kafka_consumer_zk.py | 67 ++--- kafka_consumer/tox.ini | 101 ++----- 10 files changed, 281 insertions(+), 434 deletions(-) rename kafka_consumer/tests/{docker-compose.yml => docker/docker-compose.yaml} (100%) create mode 100644 kafka_consumer/tests/runners.py diff --git a/.travis.yml b/.travis.yml index f85c324efdebc..aa7a5f9d74553 100644 --- a/.travis.yml +++ b/.travis.yml @@ -135,7 +135,7 @@ jobs: - stage: test env: CHECK=istio - stage: test - env: CHECK=kafka_consumer + env: CHECK=kafka_consumer PYTHON3=true - stage: test env: CHECK=kong - stage: test diff --git a/kafka_consumer/datadog_checks/kafka_consumer/kafka_consumer.py b/kafka_consumer/datadog_checks/kafka_consumer/kafka_consumer.py index 4bbbff4f664bb..4ba4e332bcdbf 100644 --- a/kafka_consumer/datadog_checks/kafka_consumer/kafka_consumer.py +++ b/kafka_consumer/datadog_checks/kafka_consumer/kafka_consumer.py @@ -1,30 +1,26 @@ -# (C) Datadog, Inc. 2010-2017 +# (C) Datadog, Inc. 2018 # All rights reserved # Licensed under Simplified BSD License (see LICENSE) - -# stdlib -from collections import defaultdict import random +from collections import defaultdict from time import time, sleep -# 3p +from kafka import errors as kafka_errors from kafka.client import KafkaClient -import kafka.errors as KafkaErrors +from kafka.structs import TopicPartition from kafka.protocol.commit import GroupCoordinatorRequest, OffsetFetchRequest from kafka.protocol.offset import OffsetRequest, OffsetResetStrategy -from kafka.structs import TopicPartition from kazoo.client import KazooClient from kazoo.exceptions import NoNodeError +from six import iteritems, itervalues, string_types, text_type -# project -from datadog_checks.checks import AgentCheck -from datadog_checks.config import _is_affirmative +from datadog_checks.base import AgentCheck, is_affirmative # Kafka Errors -KAFKA_NO_ERROR = KafkaErrors.NoError.errno -KAFKA_UNKNOWN_ERROR = KafkaErrors.UnknownError.errno -KAFKA_UNKNOWN_TOPIC_OR_PARTITION = KafkaErrors.UnknownTopicOrPartitionError.errno -KAFKA_NOT_LEADER_FOR_PARTITION = KafkaErrors.NotLeaderForPartitionError.errno +KAFKA_NO_ERROR = kafka_errors.NoError.errno +KAFKA_UNKNOWN_ERROR = kafka_errors.UnknownError.errno +KAFKA_UNKNOWN_TOPIC_OR_PARTITION = kafka_errors.UnknownTopicOrPartitionError.errno +KAFKA_NOT_LEADER_FOR_PARTITION = kafka_errors.NotLeaderForPartitionError.errno DEFAULT_KAFKA_TIMEOUT = 5 DEFAULT_ZK_TIMEOUT = 5 @@ -48,14 +44,10 @@ class KafkaCheck(AgentCheck): def __init__(self, name, init_config, agentConfig, instances=None): AgentCheck.__init__(self, name, init_config, agentConfig, instances=instances) - self._zk_timeout = int( - init_config.get('zk_timeout', DEFAULT_ZK_TIMEOUT)) - self._kafka_timeout = int( - init_config.get('kafka_timeout', DEFAULT_KAFKA_TIMEOUT)) - self.context_limit = int( - init_config.get('max_partition_contexts', CONTEXT_UPPER_BOUND)) - self._broker_retries = int( - init_config.get('kafka_retries', DEFAULT_KAFKA_RETRIES)) + self._zk_timeout = int(init_config.get('zk_timeout', DEFAULT_ZK_TIMEOUT)) + self._kafka_timeout = int(init_config.get('kafka_timeout', DEFAULT_KAFKA_TIMEOUT)) + self.context_limit = int(init_config.get('max_partition_contexts', CONTEXT_UPPER_BOUND)) + self._broker_retries = int(init_config.get('kafka_retries', DEFAULT_KAFKA_RETRIES)) self._zk_last_ts = {} self.kafka_clients = {} @@ -73,8 +65,7 @@ def check(self, instance): zk_hosts_ports = instance.get('zk_connect_str') zk_prefix = instance.get('zk_prefix', '') zk_interval = int(instance.get('zk_iteration_ival', 0)) - get_kafka_consumer_offsets = _is_affirmative( - instance.get('kafka_consumer_offsets', zk_hosts_ports is None)) + get_kafka_consumer_offsets = is_affirmative(instance.get('kafka_consumer_offsets', zk_hosts_ports is None)) custom_tags = instance.get('tags', []) @@ -87,8 +78,7 @@ def check(self, instance): self._validate_explicit_consumer_groups(consumer_groups) zk_consumer_offsets = None - if zk_hosts_ports and \ - self._should_zk(zk_hosts_ports, zk_interval, get_kafka_consumer_offsets): + if zk_hosts_ports and self._should_zk(zk_hosts_ports, zk_interval, get_kafka_consumer_offsets): zk_consumer_offsets, consumer_groups = self._get_zk_consumer_offsets( zk_hosts_ports, consumer_groups, zk_prefix) @@ -101,7 +91,7 @@ def check(self, instance): if get_kafka_consumer_offsets: # For now, consumer groups are mandatory if not using ZK if not zk_hosts_ports and not consumer_groups: - raise BadKafkaConsumerConfiguration('Invalid configuration - if you\'re not collecting ' + raise BadKafkaConsumerConfiguration('Invalid configuration - if you are not collecting ' 'offsets from ZK you _must_ specify consumer groups') # kafka-python automatically probes the cluster for broker version # and then stores it. Note that this returns the first version @@ -114,8 +104,8 @@ def check(self, instance): if not topics: # val = {'consumer_group': {'topic': [0, 1]}} - for _, tps in consumer_groups.iteritems(): - for topic, partitions in tps.iteritems(): + for _, tps in iteritems(consumer_groups): + for topic, partitions in iteritems(tps): topics[topic].update(partitions) warn_msg = """ Discovered %s partition contexts - this exceeds the maximum @@ -137,7 +127,7 @@ def check(self, instance): return # Report the broker highwater offset - for (topic, partition), highwater_offset in highwater_offsets.iteritems(): + for (topic, partition), highwater_offset in iteritems(highwater_offsets): broker_tags = ['topic:%s' % topic, 'partition:%s' % partition] + custom_tags self.gauge('kafka.broker_offset', highwater_offset, tags=broker_tags) @@ -154,14 +144,13 @@ def stop(self): cleanup kafka connections (to all brokers) to avoid leaving stale connections in older kafkas. """ - for cli in self.kafka_clients.itervalues(): + for cli in itervalues(self.kafka_clients): cli.close() def _get_kafka_client(self, instance): kafka_conn_str = instance.get('kafka_connect_str') - if not isinstance(kafka_conn_str, (basestring, list)): - raise BadKafkaConsumerConfiguration( - 'kafka_connect_str should be string or list of strings') + if not isinstance(kafka_conn_str, (string_types, list)): + raise BadKafkaConsumerConfiguration('kafka_connect_str should be string or list of strings') instance_key = tuple(kafka_conn_str) # cast to tuple in case it's a list if instance_key not in self.kafka_clients: @@ -196,7 +185,7 @@ def _ensure_ready_node(self, client, node_id): client.poll(future=future) if future.failed(): if future.retriable(): - if isinstance(future.exception, KafkaErrors.NodeNotReadyError): + if isinstance(future.exception, kafka_errors.NodeNotReadyError): self.log.debug("broker id: %s is not ready yet, sleeping for %f ms", node_id, delay * 10) sleep(delay) continue @@ -238,7 +227,7 @@ def _get_group_coordinator(self, client, group): return coord_id - def _process_highwater_offsets(self, request, instance, node_id, response): + def _process_highwater_offsets(self, response): highwater_offsets = {} topic_partitions_without_a_leader = [] @@ -292,7 +281,7 @@ def _get_broker_offsets(self, instance, topics): topics_to_fetch = defaultdict(set) cli = self._get_kafka_client(instance) - for topic, partitions in topics.iteritems(): + for topic, partitions in iteritems(topics): # if no partitions are provided # we're falling back to all available partitions (?) if len(partitions) == 0: @@ -300,24 +289,23 @@ def _get_broker_offsets(self, instance, topics): topics_to_fetch[topic].update(partitions) leader_tp = defaultdict(lambda: defaultdict(set)) - for topic, partitions in topics_to_fetch.iteritems(): + for topic, partitions in iteritems(topics_to_fetch): for partition in partitions: partition_leader = cli.cluster.leader_for_partition(TopicPartition(topic, partition)) if partition_leader is not None and partition_leader >= 0: - leader_tp[partition_leader][topic].update([partition]) + leader_tp[partition_leader][topic].add(partition) max_offsets = 1 - for node_id, tps in leader_tp.iteritems(): + for node_id, tps in iteritems(leader_tp): # Construct the OffsetRequest request = OffsetRequest[0]( replica_id=-1, topics=[ - (topic, [ - (partition, OffsetResetStrategy.LATEST, max_offsets) for partition in partitions]) - for topic, partitions in tps.iteritems()]) + (topic, [(partition, OffsetResetStrategy.LATEST, max_offsets) for partition in partitions]) + for topic, partitions in iteritems(tps)]) response = self._make_blocking_req(cli, request, node_id=node_id) - offsets, unled = self._process_highwater_offsets(request, instance, node_id, response) + offsets, unled = self._process_highwater_offsets(response) highwater_offsets.update(offsets) topic_partitions_without_a_leader.extend(unled) @@ -328,7 +316,7 @@ def _report_consumer_metrics(self, highwater_offsets, consumer_offsets, unled_to unled_topic_partitions = [] if tags is None: tags = [] - for (consumer_group, topic, partition), consumer_offset in consumer_offsets.iteritems(): + for (consumer_group, topic, partition), consumer_offset in iteritems(consumer_offsets): # Report the consumer group offsets and consumer lag if (topic, partition) not in highwater_offsets: self.log.warn("[%s] topic: %s partition: %s was not available in the consumer " @@ -399,7 +387,7 @@ def _get_zk_consumer_offsets(self, zk_hosts_ports, consumer_groups=None, zk_pref consumer_groups = {consumer_group: None for consumer_group in self._get_zk_path_children(zk_conn, zk_path_consumer, 'consumer groups')} - for consumer_group, topics in consumer_groups.iteritems(): + for consumer_group, topics in iteritems(consumer_groups): if topics is None: # If topics are't specified, fetch them from ZK zk_path_topics = zk_path_topic_tmpl.format(group=consumer_group) @@ -407,13 +395,12 @@ def _get_zk_consumer_offsets(self, zk_hosts_ports, consumer_groups=None, zk_pref self._get_zk_path_children(zk_conn, zk_path_topics, 'topics')} consumer_groups[consumer_group] = topics - for topic, partitions in topics.iteritems(): + for topic, partitions in iteritems(topics): if partitions is not None: partitions = set(partitions) # defend against bad user input else: # If partitions aren't specified, fetch them from ZK - zk_path_partitions = zk_path_partition_tmpl.format( - group=consumer_group, topic=topic) + zk_path_partitions = zk_path_partition_tmpl.format(group=consumer_group, topic=topic) # Zookeeper returns the partition IDs as strings because # they are extracted from the node path partitions = [int(x) for x in self._get_zk_path_children( @@ -450,7 +437,7 @@ def _get_kafka_consumer_offsets(self, instance, consumer_groups): cli = self._get_kafka_client(instance) - for consumer_group, topic_partitions in consumer_groups.iteritems(): + for consumer_group, topic_partitions in iteritems(consumer_groups): try: coordinator_id = self._get_group_coordinator(cli, consumer_group) if coordinator_id: @@ -459,7 +446,7 @@ def _get_kafka_consumer_offsets(self, instance, consumer_groups): offsets = self._get_consumer_offsets(cli, consumer_group, topic_partitions) self.log.info("unable to find group coordinator for %s", consumer_group) - for (topic, partition), offset in offsets.iteritems(): + for (topic, partition), offset in iteritems(offsets): topics[topic].update([partition]) key = (consumer_group, topic, partition) consumer_offsets[key] = offset @@ -470,10 +457,10 @@ def _get_kafka_consumer_offsets(self, instance, consumer_groups): def _get_consumer_offsets(self, client, consumer_group, topic_partitions, coord_id=None): tps = defaultdict(set) - for topic, partitions in topic_partitions.iteritems(): + for topic, partitions in iteritems(topic_partitions): if len(partitions) == 0: partitions = client.cluster.available_partitions_for_topic(topic) - tps[topic] = tps[unicode(topic)].union(set(partitions)) + tps[topic] = tps[text_type(topic)].union(set(partitions)) consumer_offsets = {} if coord_id is not None and coord_id >= 0: @@ -485,7 +472,7 @@ def _get_consumer_offsets(self, client, consumer_group, topic_partitions, coord_ # Kafka protocol uses OffsetFetchRequests to retrieve consumer offsets: # https://kafka.apache.org/protocol#The_Messages_OffsetFetch # https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetFetchRequest - request = OffsetFetchRequest[1](consumer_group, list(tps.iteritems())) + request = OffsetFetchRequest[1](consumer_group, list(iteritems(tps))) response = self._make_blocking_req(client, request, node_id=broker_id) for (topic, partition_offsets) in response.topics: for partition, offset, _, error_code in partition_offsets: @@ -503,13 +490,14 @@ def _should_zk(self, zk_hosts_ports, interval, kafka_collect=False): last = self._zk_last_ts.get(zk_hosts_ports, 0) should_zk = False - if (now-last) >= interval: + if now - last >= interval: self._zk_last_ts[zk_hosts_ports] = last should_zk = True return should_zk - def _validate_explicit_consumer_groups(self, val): + @classmethod + def _validate_explicit_consumer_groups(cls, val): """Validate any explicitly specified consumer groups. While the check does not require specifying consumer groups, @@ -518,13 +506,13 @@ def _validate_explicit_consumer_groups(self, val): val = {'consumer_group': {'topic': [0, 1]}} """ assert isinstance(val, dict) - for consumer_group, topics in val.iteritems(): - assert isinstance(consumer_group, basestring) + for consumer_group, topics in iteritems(val): + assert isinstance(consumer_group, string_types) # topics are optional assert isinstance(topics, dict) or topics is None if topics is not None: - for topic, partitions in topics.iteritems(): - assert isinstance(topic, basestring) + for topic, partitions in iteritems(topics): + assert isinstance(topic, string_types) # partitions are optional assert isinstance(partitions, (list, tuple)) or partitions is None if partitions is not None: diff --git a/kafka_consumer/setup.py b/kafka_consumer/setup.py index a7d338593b6e8..b84a3b37e57cd 100644 --- a/kafka_consumer/setup.py +++ b/kafka_consumer/setup.py @@ -24,7 +24,7 @@ def get_requirements(fpath): return f.readlines() -CHECKS_BASE_REQ = 'datadog_checks_base' +CHECKS_BASE_REQ = 'datadog-checks-base>=4.2.0' setup( name='datadog-kafka_consumer', diff --git a/kafka_consumer/tests/common.py b/kafka_consumer/tests/common.py index 05f71ffd40d62..1e2abd7d919f5 100644 --- a/kafka_consumer/tests/common.py +++ b/kafka_consumer/tests/common.py @@ -3,43 +3,26 @@ # Licensed under Simplified BSD License (see LICENSE) import os import socket -from distutils.version import LooseVersion -from datadog_checks.kafka_consumer import KafkaCheck -from datadog_checks.utils.common import get_docker_hostname +from datadog_checks.dev import get_docker_hostname +HERE = os.path.dirname(os.path.abspath(__file__)) HOST = get_docker_hostname() HOST_IP = socket.gethostbyname(HOST) -LAST_ZKONLY_VERSION = (0, 8, 1, 1) -KAFKA_LEGACY = LooseVersion('0.8.2.0') KAFKA_CONNECT_STR = '{}:9092'.format(HOST_IP) ZK_CONNECT_STR = '{}:2181'.format(HOST) TOPICS = ['marvel', 'dc', '__consumer_offsets'] PARTITIONS = [0, 1] -def is_supported(flavors): +def is_supported(flavor): """ Returns whether the current CI configuration is supported """ - supported = False - version = os.environ.get('KAFKA_VERSION') - flavor = os.environ.get('KAFKA_OFFSETS_STORAGE', '').lower() - - if not version: + if not os.environ.get('KAFKA_VERSION'): return False - for f in flavors: - if f == flavor: - supported = True - - if not supported: + if flavor != os.environ.get('KAFKA_OFFSETS_STORAGE'): return False - if version is not 'latest': - version = version.split('-')[0] - version = tuple(s for s in version.split('.') if s.strip()) - if flavor is 'kafka' and version <= KafkaCheck.LAST_ZKONLY_VERSION: - supported = False - - return supported + return True diff --git a/kafka_consumer/tests/conftest.py b/kafka_consumer/tests/conftest.py index 0f3feee7554d5..268e5903bd3f6 100644 --- a/kafka_consumer/tests/conftest.py +++ b/kafka_consumer/tests/conftest.py @@ -1,219 +1,64 @@ # (C) Datadog, Inc. 2018 # All rights reserved # Licensed under Simplified BSD License (see LICENSE) -import subprocess import os -import time -import threading -import sys import pytest -from kafka import KafkaConsumer, KafkaProducer -from kazoo.client import KazooClient +from kafka import KafkaConsumer -from .common import ZK_CONNECT_STR, KAFKA_CONNECT_STR, PARTITIONS, TOPICS, HOST_IP +from datadog_checks.dev import WaitFor, docker_run +from .common import HERE, HOST_IP, KAFKA_CONNECT_STR, PARTITIONS, TOPICS, ZK_CONNECT_STR +from .runners import Producer, KConsumer, ZKConsumer -HERE = os.path.dirname(os.path.abspath(__file__)) +def find_topics(): + consumer = KafkaConsumer(bootstrap_servers=KAFKA_CONNECT_STR, request_timeout_ms=1000) + topics = consumer.topics() + # We expect to find 2 topics: `marvel` and `dc` + return len(topics) == 2 -class StoppableThread(threading.Thread): - def __init__(self): - self._shutdown_event = threading.Event() - super(StoppableThread, self).__init__() - - def send_shutdown(self): - self._shutdown_event.set() - - -class Producer(StoppableThread): - - def run(self): - producer = KafkaProducer(bootstrap_servers=KAFKA_CONNECT_STR) - - iteration = 0 - while not self._shutdown_event.is_set(): - for partition in PARTITIONS: - try: - producer.send('marvel', b"Peter Parker", partition=partition) - producer.send('marvel', b"Bruce Banner", partition=partition) - producer.send('marvel', b"Tony Stark", partition=partition) - producer.send('marvel', b"Johhny Blaze", partition=partition) - producer.send('marvel', b"\xc2BoomShakalaka", partition=partition) - producer.send('dc', b"Diana Prince", partition=partition) - producer.send('dc', b"Bruce Wayne", partition=partition) - producer.send('dc', b"Clark Kent", partition=partition) - producer.send('dc', b"Arthur Curry", partition=partition) - producer.send('dc', b"\xc2ShakalakaBoom", partition=partition) - except Exception: - pass - - iteration += 1 - time.sleep(1) - - -class ZKConsumer(StoppableThread): - - def __init__(self, topics, partitions): - self.zk_connect_str = ZK_CONNECT_STR - self.kafka_connect_str = KAFKA_CONNECT_STR - self.topics = topics - self.partitions = partitions - super(ZKConsumer, self).__init__() - - def run(self): - zk_path_topic_tmpl = '/consumers/my_consumer/offsets/' - zk_path_partition_tmpl = zk_path_topic_tmpl + '{topic}/{partition}' - - zk_conn = KazooClient(self.zk_connect_str, timeout=10) - zk_conn.start() - - for topic in self.topics: - for partition in self.partitions: - node_path = zk_path_partition_tmpl.format(topic=topic, partition=partition) - node = zk_conn.exists(node_path) - if not node: - zk_conn.ensure_path(node_path) - zk_conn.set(node_path, str(0)) - - consumer = KafkaConsumer(bootstrap_servers=[self.kafka_connect_str], - group_id="my_consumer", - auto_offset_reset='earliest', - enable_auto_commit=False) - consumer.subscribe(self.topics) - - iteration = 0 - while not self._shutdown_event.is_set(): - response = consumer.poll(timeout_ms=500, max_records=10) - zk_trans = zk_conn.transaction() - for tp, records in response.iteritems(): - topic = tp.topic - partition = tp.partition +@pytest.fixture(scope='session') +def dd_environment(e2e_instance): + """ + Start a kafka cluster and wait for it to be up and running. + """ + with docker_run( + os.path.join(HERE, 'docker', 'docker-compose.yaml'), + conditions=[WaitFor(find_topics, attempts=30, wait=3)], + env_vars={ + # Advertising the hostname doesn't work on docker:dind so we manually + # resolve the IP address. This seems to also work outside docker:dind + # so we got that goin for us. + 'KAFKA_HOST': HOST_IP, + }, + ): + yield e2e_instance - offset = None - for record in records: - if offset is None or record.offset > offset: - offset = record.offset - if offset: - zk_trans.set_data( - os.path.join(zk_path_topic_tmpl.format(topic), str(partition)), - str(offset) - ) +@pytest.fixture(scope='session') +@pytest.mark.usefixtures('dd_environment') +def kafka_producer(): + with Producer(): + yield - zk_trans.commit() - iteration += 1 - zk_conn.stop() +@pytest.fixture(scope='session') +@pytest.mark.usefixtures('dd_environment') +def kafka_consumer(): + with KConsumer(TOPICS): + yield -class KConsumer(StoppableThread): +@pytest.fixture(scope='session') +@pytest.mark.usefixtures('dd_environment') +def zk_consumer(): + with ZKConsumer(TOPICS, PARTITIONS): + yield - def __init__(self, topics): - self.kafka_connect_str = KAFKA_CONNECT_STR - self.topics = topics - super(KConsumer, self).__init__() - def run(self): - consumer = KafkaConsumer(bootstrap_servers=self.kafka_connect_str, - group_id="my_consumer", - auto_offset_reset='earliest') - consumer.subscribe(self.topics) - - iteration = 0 - while not self._shutdown_event.is_set(): - consumer.poll(timeout_ms=500, max_records=10) - iteration += 1 - - -@pytest.fixture(scope="session") -def kafka_cluster(): - """ - Start a kafka cluster and wait for it to be up and running. - """ - env = os.environ - # Advertising the hostname doesn't work on docker:dind so we manually - # resolve the IP address. This seems to also work outside docker:dind - # so we got that goin for us. - env['KAFKA_HOST'] = HOST_IP - - args = [ - "docker-compose", - "-f", os.path.join(HERE, 'docker-compose.yml') - ] - - subprocess.check_call(args + ["up", "-d"], env=env) - - # wait for Kafka to be up and running - attempts = 0 - while True: - # This is useful to debug Kafka booting and not too verbose when - # everything runs smooth, let's leave it here - sys.stderr.write("Attempt number {}\n".format(attempts+1)) - - # this brings a total of 90s to timeout - if attempts >= 30: - # print the whole compose log in case of timeout to help diagnose - subprocess.check_call(args + ["logs"], env=env) - subprocess.check_call(args + ["down"], env=env) - raise Exception("Kafka boot timed out!") - - try: - consumer = KafkaConsumer(bootstrap_servers=KAFKA_CONNECT_STR) - topics = consumer.topics() - sys.stderr.write("Got topics: {}\n".format(topics)) - except Exception as e: - sys.stderr.write(str(e)+'\n') - topics = {} - - # we expect to find 2 topics, "dc" and "marvel" - if len(topics) == 2: - break - - attempts += 1 - time.sleep(3) - - yield - - subprocess.check_call(args + ["down"], env=env) - - -@pytest.fixture(scope="session") -def kafka_producer(kafka_cluster): - producer = Producer() - yield producer - if producer.is_alive(): - producer.send_shutdown() - producer.join(5) - - -@pytest.fixture(scope="session") -def kafka_consumer(kafka_producer): - consumer = KConsumer(TOPICS) - yield consumer - if consumer.is_alive(): - consumer.send_shutdown() - consumer.join(5) - - -@pytest.fixture(scope="session") -def zk_consumer(kafka_producer): - consumer = ZKConsumer(TOPICS, PARTITIONS) - yield consumer - if consumer.is_alive(): - consumer.send_shutdown() - consumer.join(5) - - -@pytest.fixture -def aggregator(): - from datadog_checks.stubs import aggregator - aggregator.reset() - return aggregator - - -@pytest.fixture +@pytest.fixture(scope='session') def zk_instance(): return { 'kafka_connect_str': KAFKA_CONNECT_STR, @@ -226,7 +71,7 @@ def zk_instance(): } -@pytest.fixture +@pytest.fixture(scope='session') def kafka_instance(): return { 'kafka_connect_str': KAFKA_CONNECT_STR, @@ -238,3 +83,12 @@ def kafka_instance(): } } } + + +@pytest.fixture(scope='session') +def e2e_instance(kafka_instance, zk_instance): + flavor = os.environ.get('KAFKA_OFFSETS_STORAGE') + if flavor == 'kafka': + return kafka_instance + elif flavor == 'zookeeper': + return zk_instance diff --git a/kafka_consumer/tests/docker-compose.yml b/kafka_consumer/tests/docker/docker-compose.yaml similarity index 100% rename from kafka_consumer/tests/docker-compose.yml rename to kafka_consumer/tests/docker/docker-compose.yaml diff --git a/kafka_consumer/tests/runners.py b/kafka_consumer/tests/runners.py new file mode 100644 index 0000000000000..b8257e2c3da1f --- /dev/null +++ b/kafka_consumer/tests/runners.py @@ -0,0 +1,129 @@ +# (C) Datadog, Inc. 2018 +# All rights reserved +# Licensed under Simplified BSD License (see LICENSE) +import os +import threading +import time + +from kafka import KafkaConsumer, KafkaProducer +from kazoo.client import KazooClient +from six import iteritems + +from .common import KAFKA_CONNECT_STR, PARTITIONS, ZK_CONNECT_STR + +DEFAULT_SLEEP = 5 +DEFAULT_TIMEOUT = 5 + + +class StoppableThread(threading.Thread): + def __init__(self, sleep=DEFAULT_SLEEP, timeout=DEFAULT_TIMEOUT): + super(StoppableThread, self).__init__() + self._shutdown_event = threading.Event() + self._sleep = sleep + self._timeout = timeout + + def __enter__(self): + self.start() + time.sleep(self._sleep) + return self + + def __exit__(self, *args, **kwargs): + self._shutdown_event.set() + self.join(self._timeout) + + +class Producer(StoppableThread): + def run(self): + producer = KafkaProducer(bootstrap_servers=KAFKA_CONNECT_STR) + + iteration = 0 + while not self._shutdown_event.is_set(): + for partition in PARTITIONS: + try: + producer.send('marvel', b"Peter Parker", partition=partition) + producer.send('marvel', b"Bruce Banner", partition=partition) + producer.send('marvel', b"Tony Stark", partition=partition) + producer.send('marvel', b"Johhny Blaze", partition=partition) + producer.send('marvel', b"\xc2BoomShakalaka", partition=partition) + producer.send('dc', b"Diana Prince", partition=partition) + producer.send('dc', b"Bruce Wayne", partition=partition) + producer.send('dc', b"Clark Kent", partition=partition) + producer.send('dc', b"Arthur Curry", partition=partition) + producer.send('dc', b"\xc2ShakalakaBoom", partition=partition) + except Exception: + pass + + iteration += 1 + time.sleep(1) + + +class KConsumer(StoppableThread): + def __init__(self, topics, sleep=DEFAULT_SLEEP, timeout=DEFAULT_TIMEOUT): + super(KConsumer, self).__init__(sleep=sleep, timeout=timeout) + self.kafka_connect_str = KAFKA_CONNECT_STR + self.topics = topics + + def run(self): + consumer = KafkaConsumer(bootstrap_servers=self.kafka_connect_str, + group_id="my_consumer", + auto_offset_reset='earliest') + consumer.subscribe(self.topics) + + iteration = 0 + while not self._shutdown_event.is_set(): + consumer.poll(timeout_ms=500, max_records=10) + iteration += 1 + + +class ZKConsumer(StoppableThread): + def __init__(self, topics, partitions, sleep=DEFAULT_SLEEP, timeout=DEFAULT_TIMEOUT): + super(ZKConsumer, self).__init__(sleep=sleep, timeout=timeout) + self.zk_connect_str = ZK_CONNECT_STR + self.kafka_connect_str = KAFKA_CONNECT_STR + self.topics = topics + self.partitions = partitions + + def run(self): + zk_path_topic_tmpl = '/consumers/my_consumer/offsets/' + zk_path_partition_tmpl = zk_path_topic_tmpl + '{topic}/{partition}' + + zk_conn = KazooClient(self.zk_connect_str, timeout=10) + zk_conn.start() + + for topic in self.topics: + for partition in self.partitions: + node_path = zk_path_partition_tmpl.format(topic=topic, partition=partition) + node = zk_conn.exists(node_path) + if not node: + zk_conn.ensure_path(node_path) + zk_conn.set(node_path, str(0)) + + consumer = KafkaConsumer(bootstrap_servers=[self.kafka_connect_str], + group_id="my_consumer", + auto_offset_reset='earliest', + enable_auto_commit=False) + consumer.subscribe(self.topics) + + iteration = 0 + while not self._shutdown_event.is_set(): + response = consumer.poll(timeout_ms=500, max_records=10) + zk_trans = zk_conn.transaction() + for tp, records in iteritems(response): + topic = tp.topic + partition = tp.partition + + offset = None + for record in records: + if offset is None or record.offset > offset: + offset = record.offset + + if offset: + zk_trans.set_data( + os.path.join(zk_path_topic_tmpl.format(topic), str(partition)), + str(offset) + ) + + zk_trans.commit() + iteration += 1 + + zk_conn.stop() diff --git a/kafka_consumer/tests/test_kafka_consumer.py b/kafka_consumer/tests/test_kafka_consumer.py index acf7459093e24..a790787c24c63 100644 --- a/kafka_consumer/tests/test_kafka_consumer.py +++ b/kafka_consumer/tests/test_kafka_consumer.py @@ -1,12 +1,16 @@ # (C) Datadog, Inc. 2018 # All rights reserved # Licensed under Simplified BSD License (see LICENSE) -import time - import pytest +from six import iteritems -from .common import is_supported from datadog_checks.kafka_consumer import KafkaCheck +from .common import is_supported + +pytestmark = pytest.mark.skipif( + not is_supported('kafka'), + reason='kafka consumer offsets not supported in current environment' +) BROKER_METRICS = [ @@ -19,27 +23,16 @@ ] -@pytest.mark.kafka -def test_check_kafka(kafka_cluster, kafka_producer, kafka_consumer, kafka_instance, aggregator): +@pytest.mark.usefixtures('dd_environment', 'kafka_consumer', 'kafka_producer') +def test_check_kafka(aggregator, kafka_instance): """ Testing Kafka_consumer check. """ - if not is_supported(['kafka']): - pytest.skip("kafka consumer offsets not supported in current environment") - - if not kafka_producer.is_alive(): - kafka_producer.start() - time.sleep(5) - - if not kafka_consumer.is_alive(): - kafka_consumer.start() - time.sleep(5) - kafka_consumer_check = KafkaCheck('kafka_consumer', {}, {}) kafka_consumer_check.check(kafka_instance) - for name, consumer_group in kafka_instance['consumer_groups'].iteritems(): - for topic, partitions in consumer_group.iteritems(): + for name, consumer_group in iteritems(kafka_instance['consumer_groups']): + for topic, partitions in iteritems(consumer_group): for partition in partitions: tags = ["topic:{}".format(topic), "partition:{}".format(partition)] + ['optional:tag1'] diff --git a/kafka_consumer/tests/test_kafka_consumer_zk.py b/kafka_consumer/tests/test_kafka_consumer_zk.py index 5bd16e294c917..be5c496249c75 100644 --- a/kafka_consumer/tests/test_kafka_consumer_zk.py +++ b/kafka_consumer/tests/test_kafka_consumer_zk.py @@ -2,12 +2,17 @@ # All rights reserved # Licensed under Simplified BSD License (see LICENSE) import copy -import time import pytest +from six import iteritems -from .common import is_supported, PARTITIONS, TOPICS, HOST from datadog_checks.kafka_consumer import KafkaCheck +from .common import HOST, PARTITIONS, TOPICS, is_supported + +pytestmark = pytest.mark.skipif( + not is_supported('zookeeper'), + reason='zookeeper consumer offsets not supported in current environment' +) BROKER_METRICS = [ @@ -20,27 +25,16 @@ ] -@pytest.mark.zookeeper -def test_check_zk(kafka_cluster, kafka_producer, zk_consumer, zk_instance, aggregator): +@pytest.mark.usefixtures('dd_environment', 'kafka_producer', 'zk_consumer') +def test_check_zk(aggregator, zk_instance): """ Testing Kafka_consumer check. """ - if not is_supported(['zookeeper']): - pytest.skip("zookeeper consumer offsets not supported in current environment") - - if not kafka_producer.is_alive(): - kafka_producer.start() - time.sleep(5) - - if not zk_consumer.is_alive(): - zk_consumer.start() - time.sleep(5) - kafka_consumer_check = KafkaCheck('kafka_consumer', {}, {}) kafka_consumer_check.check(zk_instance) - for name, consumer_group in zk_instance['consumer_groups'].iteritems(): - for topic, partitions in consumer_group.iteritems(): + for name, consumer_group in iteritems(zk_instance['consumer_groups']): + for topic, partitions in iteritems(consumer_group): for partition in partitions: tags = ["topic:{}".format(topic), "partition:{}".format(partition)] @@ -54,22 +48,11 @@ def test_check_zk(kafka_cluster, kafka_producer, zk_consumer, zk_instance, aggre aggregator.assert_metric('kafka.broker_offset', at_least=1) -@pytest.mark.zookeeper -def test_multiple_servers_zk(kafka_cluster, kafka_producer, zk_consumer, zk_instance, aggregator): +@pytest.mark.usefixtures('dd_environment', 'kafka_producer', 'zk_consumer') +def test_multiple_servers_zk(aggregator, zk_instance): """ Testing Kafka_consumer check. """ - if not is_supported(['zookeeper']): - pytest.skip("zookeeper consumer offsets not supported in current environment") - - if not kafka_producer.is_alive(): - kafka_producer.start() - time.sleep(5) - - if not zk_consumer.is_alive(): - zk_consumer.start() - time.sleep(5) - multiple_server_zk_instance = copy.deepcopy(zk_instance) multiple_server_zk_instance['kafka_connect_str'] = [ multiple_server_zk_instance['kafka_connect_str'], @@ -78,8 +61,8 @@ def test_multiple_servers_zk(kafka_cluster, kafka_producer, zk_consumer, zk_inst kafka_consumer_check = KafkaCheck('kafka_consumer', {}, {}) kafka_consumer_check.check(multiple_server_zk_instance) - for name, consumer_group in multiple_server_zk_instance['consumer_groups'].iteritems(): - for topic, partitions in consumer_group.iteritems(): + for name, consumer_group in iteritems(multiple_server_zk_instance['consumer_groups']): + for topic, partitions in iteritems(consumer_group): for partition in partitions: tags = ["topic:{}".format(topic), "partition:{}".format(partition)] @@ -90,22 +73,11 @@ def test_multiple_servers_zk(kafka_cluster, kafka_producer, zk_consumer, zk_inst ["source:zk", "consumer_group:{}".format(name)], at_least=1) -@pytest.mark.zookeeper -def test_check_nogroups_zk(kafka_cluster, kafka_producer, zk_consumer, zk_instance, aggregator): +@pytest.mark.usefixtures('dd_environment', 'kafka_producer', 'zk_consumer') +def test_check_nogroups_zk(aggregator, zk_instance): """ Testing Kafka_consumer check grabbing groups from ZK """ - if not is_supported(['zookeeper']): - pytest.skip("zookeeper consumer offsets not supported in current environment") - - if not kafka_producer.is_alive(): - kafka_producer.start() - time.sleep(5) - - if not zk_consumer.is_alive(): - zk_consumer.start() - time.sleep(5) - nogroup_instance = copy.deepcopy(zk_instance) nogroup_instance.pop('consumer_groups') nogroup_instance['monitor_unlisted_consumer_groups'] = True @@ -120,9 +92,8 @@ def test_check_nogroups_zk(kafka_cluster, kafka_producer, zk_consumer, zk_instan "partition:{}".format(partition)] for mname in BROKER_METRICS: aggregator.assert_metric(mname, tags=tags, at_least=1) - for mname in CONSUMER_METRICS: - aggregator.assert_metric(mname, tags=tags + - ["source:zk", "consumer_group:my_consumer"], at_least=1) + for mname in CONSUMER_METRICS: + aggregator.assert_metric(mname, tags=tags + ['source:zk', 'consumer_group:my_consumer'], at_least=1) else: for mname in BROKER_METRICS + CONSUMER_METRICS: aggregator.assert_metric(mname, at_least=1) diff --git a/kafka_consumer/tox.ini b/kafka_consumer/tox.ini index 85552953c15df..9dc6fe17e14ed 100644 --- a/kafka_consumer/tox.ini +++ b/kafka_consumer/tox.ini @@ -1,104 +1,33 @@ - [tox] minversion = 2.0 basepython = py27 envlist = - kafkaconsumer-{0_9_0_1,0_10_2_1,0_11_0_1,1_0_1,1_1_0}-{kafka,zk} + {0.9,0.10,0.11,1.0,1.1}-{kafka,zk} flake8 -[common] -commands = - pip install --require-hashes -r requirements.txt - [testenv] usedevelop = true -platform = linux2|darwin|win32 +platform = linux|darwin|win32 passenv = DOCKER* COMPOSE* deps = -e../datadog_checks_base[deps] -rrequirements-dev.txt - -[testenv:kafkaconsumer-0_9_0_1-kafka] -setenv = - KAFKA_VERSION=0.9.0.1-1 - KAFKA_OFFSETS_STORAGE=kafka -commands = - {[common]commands} - pytest -m"kafka" -v - -[testenv:kafkaconsumer-0_9_0_1-zk] -setenv = - KAFKA_VERSION=0.9.0.1-1 - KAFKA_OFFSETS_STORAGE=zookeeper -commands = - {[common]commands} - pytest -m"zookeeper" -v - -[testenv:kafkaconsumer-0_10_2_1-kafka] -setenv = - KAFKA_VERSION=0.10.2.1 - KAFKA_OFFSETS_STORAGE=kafka -commands = - {[common]commands} - pytest -m"kafka" -v - -[testenv:kafkaconsumer-0_10_2_1-zk] -setenv = - KAFKA_VERSION=0.10.2.1 - KAFKA_OFFSETS_STORAGE=zookeeper -commands = - {[common]commands} - pytest -m"zookeeper" -v - -[testenv:kafkaconsumer-0_11_0_1-kafka] -setenv = - KAFKA_VERSION=0.11.0.1 - KAFKA_OFFSETS_STORAGE=kafka commands = - {[common]commands} - pytest -m"kafka" -v - -[testenv:kafkaconsumer-0_11_0_1-zk] -setenv = - KAFKA_VERSION=0.11.0.1 - KAFKA_OFFSETS_STORAGE=zookeeper -commands = - {[common]commands} - pytest -m"zookeeper" -v - -[testenv:kafkaconsumer-1_0_1-kafka] -setenv = - KAFKA_VERSION=1.0.1 - KAFKA_OFFSETS_STORAGE=kafka -commands = - {[common]commands} - pytest -m"kafka" -v - -[testenv:kafkaconsumer-1_0_1-zk] -setenv = - KAFKA_VERSION=1.0.1 - KAFKA_OFFSETS_STORAGE=zookeeper -commands = - {[common]commands} - pytest -m"zookeeper" -v - -[testenv:kafkaconsumer-1_1_0-kafka] -setenv = - KAFKA_VERSION=1.1.0 - KAFKA_OFFSETS_STORAGE=kafka -commands = - {[common]commands} - pytest -m"kafka" -v - -[testenv:kafkaconsumer-1_1_0-zk] -setenv = - KAFKA_VERSION=1.1.0 - KAFKA_OFFSETS_STORAGE=zookeeper -commands = - {[common]commands} - pytest -m"zookeeper" -v + pip install --require-hashes -r requirements.txt + pytest -v +setenv = + kafka: KAFKA_OFFSETS_STORAGE=kafka + zk: KAFKA_OFFSETS_STORAGE=zookeeper + 0.9: KAFKA_VERSION=0.9.0.1-1 + 0.10: KAFKA_VERSION=0.10.2.1 + 0.11: KAFKA_VERSION=0.11.0.1 + 1.0: KAFKA_VERSION=1.0.1 + 1.1: KAFKA_VERSION=1.1.0 + +[testenv:1.1-kafka] +basepython = python3.6 [testenv:flake8] skip_install = true