diff --git a/kafka_consumer/datadog_checks/kafka_consumer/new_kafka_consumer.py b/kafka_consumer/datadog_checks/kafka_consumer/new_kafka_consumer.py index de79f1ef44f7bb..99853af926ac81 100644 --- a/kafka_consumer/datadog_checks/kafka_consumer/new_kafka_consumer.py +++ b/kafka_consumer/datadog_checks/kafka_consumer/new_kafka_consumer.py @@ -1,13 +1,13 @@ # (C) Datadog, Inc. 2019-present # All rights reserved # Licensed under Simplified BSD License (see LICENSE) +import json from collections import defaultdict from time import time from kafka import errors as kafka_errors from kafka.protocol.offset import OffsetRequest, OffsetResetStrategy, OffsetResponse from kafka.structs import TopicPartition -from six.moves import cPickle as pickle from datadog_checks.base import AgentCheck, ConfigurationError from datadog_checks.kafka_consumer.datadog_agent import read_persistent_cache, write_persistent_cache @@ -98,17 +98,17 @@ def check(self): self._collect_broker_metadata() def _load_broker_timestamps(self): - """Loads broker timestamps from persistant cache.""" + """Loads broker timestamps from persistent cache.""" self._broker_timestamps = defaultdict(dict) try: - self._broker_timestamps.update( - pickle.loads(read_persistent_cache(self._broker_timestamp_cache_key)) - ) # use update since defaultdict does not pickle - except Exception: - self.log.warning('Could not read broker timestamps from cache') + for topic_partition, content in json.loads(read_persistent_cache(self._broker_timestamp_cache_key)).items(): + for offset, timestamp in content.items(): + self._broker_timestamps[topic_partition][int(offset)] = timestamp + except Exception as e: + self.log.warning('Could not read broker timestamps from cache' + str(e)) def _save_broker_timestamps(self): - write_persistent_cache(self._broker_timestamp_cache_key, pickle.dumps(self._broker_timestamps)) + write_persistent_cache(self._broker_timestamp_cache_key, json.dumps(self._broker_timestamps)) def _create_kafka_admin_client(self, api_version): """Return a KafkaAdminClient.""" @@ -192,7 +192,7 @@ def _highwater_offsets_callback(self, response): error_type = kafka_errors.for_code(error_code) if error_type is kafka_errors.NoError: self._highwater_offsets[(topic, partition)] = offsets[0] - timestamps = self._broker_timestamps[(topic, partition)] + timestamps = self._broker_timestamps["{}_{}".format(topic, partition)] timestamps[offsets[0]] = time() # If there's too many timestamps, we delete the oldest if len(timestamps) > MAX_TIMESTAMPS: @@ -280,13 +280,13 @@ def _report_consumer_offsets_and_lag(self, contexts_limit): if reported_contexts >= contexts_limit: continue - timestamps = self._broker_timestamps[(topic, partition)] + timestamps = self._broker_timestamps["{}_{}".format(topic, partition)] # producer_timestamp is set in the same check, so it should never be None producer_timestamp = timestamps[producer_offset] consumer_timestamp = self._get_interpolated_timestamp(timestamps, consumer_offset) if consumer_timestamp is None or producer_timestamp is None: continue - lag = consumer_timestamp - producer_timestamp + lag = producer_timestamp - consumer_timestamp self.gauge('consumer_lag_seconds', lag, tags=consumer_group_tags) reported_contexts += 1 else: diff --git a/kafka_consumer/tests/test_kafka_consumer.py b/kafka_consumer/tests/test_kafka_consumer.py index cd46dcc064f938..b2197daf3d2023 100644 --- a/kafka_consumer/tests/test_kafka_consumer.py +++ b/kafka_consumer/tests/test_kafka_consumer.py @@ -2,8 +2,8 @@ # All rights reserved # Licensed under Simplified BSD License (see LICENSE) import copy +import json import os -import pickle from collections import defaultdict import mock @@ -23,15 +23,20 @@ BROKER_METRICS = ['kafka.broker_offset'] CONSUMER_METRICS = ['kafka.consumer_offset', 'kafka.consumer_lag'] -if not is_legacy_check(): - CONSUMER_METRICS.append("kafka.consumer_lag_seconds") def mocked_read_persistent_cache(cache_key): cached_offsets = defaultdict(dict) - cached_offsets[("marvel", 0)][25] = 150 - cached_offsets[("marvel", 0)][45] = 250 - return pickle.dumps(cached_offsets) + cached_offsets["marvel_0"][25] = 150 + cached_offsets["marvel_0"][40] = 200 + return json.dumps(cached_offsets) + + +def mocked_time(): + # broker offset 80 will be set to timestamp 400. + # knowing that from the cache, offset 40 is set at 200, and that the consumer is at offset 60, + # the timestamp of the consumer will be at 300. So time lag is 400-300=100seconds. + return 400 @pytest.mark.unit @@ -127,16 +132,15 @@ def test_tls_config_legacy(extra_config, expected_http_kwargs, kafka_instance): @pytest.mark.integration @pytest.mark.usefixtures('dd_environment') +@mock.patch('datadog_checks.kafka_consumer.new_kafka_consumer.read_persistent_cache', mocked_read_persistent_cache) +@mock.patch('datadog_checks.kafka_consumer.new_kafka_consumer.time', mocked_time) def test_check_kafka(aggregator, kafka_instance, dd_run_check): """ Testing Kafka_consumer check. """ - with mock.patch( - 'datadog_checks.kafka_consumer.new_kafka_consumer.read_persistent_cache', mocked_read_persistent_cache - ): - kafka_consumer_check = KafkaCheck('kafka_consumer', {}, [kafka_instance]) - dd_run_check(kafka_consumer_check) - assert_check_kafka(aggregator, kafka_instance['consumer_groups']) + kafka_consumer_check = KafkaCheck('kafka_consumer', {}, [kafka_instance]) + dd_run_check(kafka_consumer_check) + assert_check_kafka(aggregator, kafka_instance['consumer_groups']) @pytest.mark.integration @@ -178,6 +182,10 @@ def assert_check_kafka(aggregator, consumer_groups): aggregator.assert_metric(mname, tags=tags, at_least=1) for mname in CONSUMER_METRICS: aggregator.assert_metric(mname, tags=tags + ["consumer_group:{}".format(name)], at_least=1) + if not is_legacy_check(): + aggregator.assert_metric( + "kafka.consumer_lag_seconds", tags=tags + ["consumer_group:{}".format(name)], count=1, value=100 + ) aggregator.assert_all_metrics_covered() @@ -196,28 +204,26 @@ def test_consumer_config_error(caplog, dd_run_check): @pytest.mark.skipif(is_legacy_check(), reason="This test does not apply to the legacy check.") @pytest.mark.integration @pytest.mark.usefixtures('dd_environment') +@mock.patch('datadog_checks.kafka_consumer.new_kafka_consumer.read_persistent_cache', mocked_read_persistent_cache) +@mock.patch('datadog_checks.kafka_consumer.new_kafka_consumer.time', mocked_time) def test_no_topics(aggregator, kafka_instance, dd_run_check): - with mock.patch( - 'datadog_checks.kafka_consumer.new_kafka_consumer.read_persistent_cache', mocked_read_persistent_cache - ): - kafka_instance['consumer_groups'] = {'my_consumer': {}} - kafka_consumer_check = KafkaCheck('kafka_consumer', {}, [kafka_instance]) - dd_run_check(kafka_consumer_check) + kafka_instance['consumer_groups'] = {'my_consumer': {}} + kafka_consumer_check = KafkaCheck('kafka_consumer', {}, [kafka_instance]) + dd_run_check(kafka_consumer_check) - assert_check_kafka(aggregator, {'my_consumer': {'marvel': [0]}}) + assert_check_kafka(aggregator, {'my_consumer': {'marvel': [0]}}) @pytest.mark.integration @pytest.mark.usefixtures('dd_environment') +@mock.patch('datadog_checks.kafka_consumer.new_kafka_consumer.read_persistent_cache', mocked_read_persistent_cache) +@mock.patch('datadog_checks.kafka_consumer.new_kafka_consumer.time', mocked_time) def test_no_partitions(aggregator, kafka_instance, dd_run_check): - with mock.patch( - 'datadog_checks.kafka_consumer.new_kafka_consumer.read_persistent_cache', mocked_read_persistent_cache - ): - kafka_instance['consumer_groups'] = {'my_consumer': {'marvel': []}} - kafka_consumer_check = KafkaCheck('kafka_consumer', {}, [kafka_instance]) - dd_run_check(kafka_consumer_check) + kafka_instance['consumer_groups'] = {'my_consumer': {'marvel': []}} + kafka_consumer_check = KafkaCheck('kafka_consumer', {}, [kafka_instance]) + dd_run_check(kafka_consumer_check) - assert_check_kafka(aggregator, {'my_consumer': {'marvel': [0]}}) + assert_check_kafka(aggregator, {'my_consumer': {'marvel': [0]}}) @pytest.mark.skipif(os.environ.get('KAFKA_VERSION', '').startswith('0.9'), reason='Old Kafka version')