diff --git a/kafka_consumer/datadog_checks/kafka_consumer/kafka_consumer.py b/kafka_consumer/datadog_checks/kafka_consumer/kafka_consumer.py index 8fe98b5f99567..e5ac6850e8e70 100644 --- a/kafka_consumer/datadog_checks/kafka_consumer/kafka_consumer.py +++ b/kafka_consumer/datadog_checks/kafka_consumer/kafka_consumer.py @@ -16,6 +16,7 @@ from kazoo.exceptions import NoNodeError from six import iteritems, itervalues, string_types, text_type +from datadog_checks.base.utils.containers import hash_mutable from datadog_checks.base import AgentCheck, is_affirmative # Kafka Errors @@ -493,13 +494,13 @@ def _get_consumer_offsets(self, client, consumer_group, topic_partitions, coord_ def _should_zk(self, zk_hosts_ports, interval, kafka_collect=False): if not kafka_collect or not interval: return True - + zk_hosts_ports_hash = hash_mutable(zk_hosts_ports) now = time() - last = self._zk_last_ts.get(zk_hosts_ports, 0) + last = self._zk_last_ts.get(zk_hosts_ports_hash, 0) should_zk = False if now - last >= interval: - self._zk_last_ts[zk_hosts_ports] = last + self._zk_last_ts[zk_hosts_ports_hash] = last should_zk = True return should_zk diff --git a/kafka_consumer/tests/test_kafka_consumer_zk.py b/kafka_consumer/tests/test_kafka_consumer_zk.py index 4a4a13a7d2885..560ec780bfb8f 100644 --- a/kafka_consumer/tests/test_kafka_consumer_zk.py +++ b/kafka_consumer/tests/test_kafka_consumer_zk.py @@ -2,12 +2,14 @@ # All rights reserved # Licensed under Simplified BSD License (see LICENSE) import copy +import time import pytest from six import iteritems +from datadog_checks.base.utils.containers import hash_mutable from datadog_checks.kafka_consumer import KafkaCheck -from .common import HOST, PARTITIONS, TOPICS, is_supported +from .common import HOST, PARTITIONS, TOPICS, ZK_CONNECT_STR, is_supported pytestmark = pytest.mark.skipif( not is_supported('zookeeper'), @@ -97,3 +99,15 @@ def test_check_nogroups_zk(aggregator, zk_instance): else: for mname in BROKER_METRICS + CONSUMER_METRICS: aggregator.assert_metric(mname, at_least=1) + + +def test_should_zk(): + check = KafkaCheck('kafka_consumer', {}, {}) + # Kafka Consumer Offsets set to True and we have a zk_connect_str that hasn't been run yet + assert (check._should_zk([ZK_CONNECT_STR, ZK_CONNECT_STR], 10, True) is True) + # Kafka Consumer Offsets is set to False, should immediately ZK + assert (check._should_zk(ZK_CONNECT_STR, 10, False) is True) + # Last time we checked ZK_CONNECT_STR was less than interval ago, shouldn't ZK + zk_connect_hash = hash_mutable(ZK_CONNECT_STR) + check._zk_last_ts[zk_connect_hash] = time.time() + assert (check._should_zk(ZK_CONNECT_STR, 100, True) is False)