From 5047ca11ae6c5b206c62b6876eaefab1d8d250de Mon Sep 17 00:00:00 2001 From: Piotr Wolski Date: Wed, 20 Apr 2022 14:28:23 +0200 Subject: [PATCH 01/14] kafka_consumer: Add lag new metric: lag in seconds --- .../kafka_consumer/new_kafka_consumer.py | 80 ++++++++++++++++++- kafka_consumer/tests/test_kafka_consumer.py | 2 +- kafka_consumer/tox.ini | 8 +- 3 files changed, 83 insertions(+), 7 deletions(-) diff --git a/kafka_consumer/datadog_checks/kafka_consumer/new_kafka_consumer.py b/kafka_consumer/datadog_checks/kafka_consumer/new_kafka_consumer.py index aa7ce93b1a283..02345ac0de0a3 100644 --- a/kafka_consumer/datadog_checks/kafka_consumer/new_kafka_consumer.py +++ b/kafka_consumer/datadog_checks/kafka_consumer/new_kafka_consumer.py @@ -2,6 +2,8 @@ # All rights reserved # Licensed under Simplified BSD License (see LICENSE) from collections import defaultdict +from time import time +import os from kafka import errors as kafka_errors from kafka.protocol.offset import OffsetRequest, OffsetResetStrategy, OffsetResponse @@ -10,7 +12,10 @@ from datadog_checks.base import AgentCheck, ConfigurationError from .constants import BROKER_REQUESTS_BATCH_SIZE, KAFKA_INTERNAL_TOPICS +from six.moves import cPickle as pickle +MAX_TIMESTAMPS = 1000 +BROKER_TIMESTAMPS_FILENAME = '/tmp/broker_timestamps' class NewKafkaConsumerCheck(object): """ @@ -23,6 +28,7 @@ def __init__(self, parent_check): self._parent_check = parent_check self._broker_requests_batch_size = self.instance.get('broker_requests_batch_size', BROKER_REQUESTS_BATCH_SIZE) self._kafka_client = None + self._broker_timestamps_filename = BROKER_TIMESTAMPS_FILENAME + self.instance.get('kafka_connect_str') def __getattr__(self, item): try: @@ -61,6 +67,7 @@ def check(self): self.log.exception("There was a problem collecting consumer offsets from Kafka.") # don't raise because we might get valid broker offsets + self._load_broker_timestamps() # Fetch the broker highwater offsets try: if len(self._consumer_offsets) < self._context_limit: @@ -82,12 +89,37 @@ def check(self): self._context_limit, ) + self._save_broker_timestamps() + # Report the metrics self._report_highwater_offsets(self._context_limit) self._report_consumer_offsets_and_lag(self._context_limit - len(self._highwater_offsets)) self._collect_broker_metadata() + def _load_broker_timestamps(self): + """Loads broker timestamps from disk.""" + self._broker_timestamps = defaultdict(dict) + try: + with open(self._broker_timestamps_filename, 'rb') as f: + self._broker_timestamps.update(pickle.load(f)) # use update since defaultdict does not pickle + except Exception: + # file may be corrupted from agent restart during writing + # remove the file, it will be created again further down + self.log.warning('Could not read broker timestamps on disk. Removing file...') + try: + os.remove(self._broker_timestamps_filename) + except OSError: + self.log.exception('Error removing broker timestamps file %s', self._broker_timestamps_filename) + pass + + def _save_broker_timestamps(self): + try: + with open(self._broker_timestamps_filename, 'wb') as f: + pickle.dump(self._broker_timestamps, f) + except Exception: + self.log.exception('Could not write the broker timestamps on the disk') + def _create_kafka_admin_client(self, api_version): """Return a KafkaAdminClient.""" # TODO accept None (which inherits kafka-python default of localhost:9092) @@ -170,6 +202,11 @@ def _highwater_offsets_callback(self, response): error_type = kafka_errors.for_code(error_code) if error_type is kafka_errors.NoError: self._highwater_offsets[(topic, partition)] = offsets[0] + timestamps = self._broker_timestamps[(topic, partition)] + timestamps[offsets[0]] = time() + # If there's too many timestamps, we delete the oldest + if len(timestamps) > MAX_TIMESTAMPS: + del timestamps[min(timestamps)] elif error_type is kafka_errors.NotLeaderForPartitionError: self.log.warning( "Kafka broker returned %s (error_code %s) for topic %s, partition: %s. This should only happen " @@ -233,8 +270,8 @@ def _report_consumer_offsets_and_lag(self, contexts_limit): partition, ) continue - - consumer_lag = self._highwater_offsets[(topic, partition)] - consumer_offset + producer_offset = self._highwater_offsets[(topic, partition)] + consumer_lag = producer_offset - consumer_offset if reported_contexts < contexts_limit: self.gauge('consumer_lag', consumer_lag, tags=consumer_group_tags) reported_contexts += 1 @@ -251,6 +288,16 @@ def _report_consumer_offsets_and_lag(self, contexts_limit): self.send_event(title, message, consumer_group_tags, 'consumer_lag', key, severity="error") self.log.debug(message) + if reported_contexts >= contexts_limit: + continue + timestamps = self._broker_timestamps[(topic, partition)] + producer_timestamp = timestamps[producer_offset] + consumer_timestamp = self._get_interpolated_timestamp(timestamps, consumer_offset) + if consumer_timestamp is None: + continue + lag = consumer_timestamp - producer_timestamp + self.gauge('consumer_lag_seconds', lag, tags=consumer_group_tags) + reported_contexts += 1 else: if partitions is None: msg = ( @@ -265,6 +312,35 @@ def _report_consumer_offsets_and_lag(self, contexts_limit): self.log.warning(msg, consumer_group, topic, partition) self.kafka_client._client.cluster.request_update() # force metadata update on next poll() + def _get_interpolated_timestamp(self, timestamps, offset): + if offset in timestamps: + return timestamps[offset], True + offsets = timestamps.keys() + try: + # Get the most close saved offsets to the consumer_offset + offset_before = max([o for o in offsets if o < offset]) + offset_after = min([o for o in offsets if o > offset]) + except ValueError: + if len(offsets) < 2: + self.log.debug("Can't compute the timestamp as we don't have enough offsets history yet") + return None + # We couldn't find offsets before and after the current consumer offset. + # This happens when you start a consumer to replay data in the past: + # - We provision a consumer at t0 that will start consuming from t1 (t1 << t0). + # - It starts building a history of offset/timestamp pairs from the moment it started to run, i.e. t0. + # - So there is no offset/timestamp pair in the local history between t1 -> t0. + # We'll take the min and max offsets available and assume the timestamp is an affine function + # of the offset to compute an approximate broker timestamp corresponding to the current consumer offset. + offset_before = min(offsets) + offset_after = max(offsets) + + # We assume that the timestamp is an affine function of the offset + timestamp_before = timestamps[offset_before] + timestamp_after = timestamps[offset_after] + slope = (timestamp_after - timestamp_before) / float(offset_after - offset_before) + timestamp = slope * (offset - offset_after) + timestamp_after + return timestamp + def _get_consumer_offsets(self): """Fetch Consumer Group offsets from Kafka. diff --git a/kafka_consumer/tests/test_kafka_consumer.py b/kafka_consumer/tests/test_kafka_consumer.py index 075322494d42c..33d8a13ec8952 100644 --- a/kafka_consumer/tests/test_kafka_consumer.py +++ b/kafka_consumer/tests/test_kafka_consumer.py @@ -20,7 +20,7 @@ BROKER_METRICS = ['kafka.broker_offset'] -CONSUMER_METRICS = ['kafka.consumer_offset', 'kafka.consumer_lag'] +CONSUMER_METRICS = ['kafka.consumer_offset', 'kafka.consumer_lag', "kafka.consumer_lag_seconds"] @pytest.mark.unit diff --git a/kafka_consumer/tox.ini b/kafka_consumer/tox.ini index 32673ab9a0074..ef954f1575498 100644 --- a/kafka_consumer/tox.ini +++ b/kafka_consumer/tox.ini @@ -1,18 +1,18 @@ [tox] isolated_build = true minversion = 2.0 -basepython = py38 +basepython = py39 envlist = py27-{0.9,latest}-{kafka,zk} - py38-{0.9,0.11,1.1,2.3,latest}-{kafka,zk} + py39-{0.9,0.11,1.1,2.3,latest}-{kafka,zk} [testenv] ensure_default_envdir = true envdir = py27: {toxworkdir}/py27 - py38: {toxworkdir}/py38 + py39: {toxworkdir}/py39 description = - py{27,38}-{0.9,latest}-{kafka,zk}: e2e ready + py{27,39}-{0.9,latest}-{kafka,zk}: e2e ready dd_check_style = true usedevelop = true platform = linux|darwin|win32 From 0bf6c286abe7c55d42c1c6c781463a93aca43c9b Mon Sep 17 00:00:00 2001 From: Piotr Wolski Date: Tue, 3 May 2022 10:12:38 +0200 Subject: [PATCH 02/14] kafka_consumer: Add kafka.consumer_lag_seconds metric --- .../kafka_consumer/datadog_agent.py | 17 +++++++++++ .../kafka_consumer/new_kafka_consumer.py | 25 +++++----------- kafka_consumer/tests/test_kafka_consumer.py | 30 +++++++++++++++++-- 3 files changed, 52 insertions(+), 20 deletions(-) create mode 100644 kafka_consumer/datadog_checks/kafka_consumer/datadog_agent.py diff --git a/kafka_consumer/datadog_checks/kafka_consumer/datadog_agent.py b/kafka_consumer/datadog_checks/kafka_consumer/datadog_agent.py new file mode 100644 index 0000000000000..1548d309c38c6 --- /dev/null +++ b/kafka_consumer/datadog_checks/kafka_consumer/datadog_agent.py @@ -0,0 +1,17 @@ +# (C) Datadog, Inc. 2020-present +# All rights reserved +# Licensed under Simplified BSD License (see LICENSE) + +from typing import Optional + +try: + from datadog_agent import read_persistent_cache, write_persistent_cache +except ImportError: + + def write_persistent_cache(key, value): + # type: (str, str) -> None + pass + + def read_persistent_cache(key): + # type: (str) -> Optional[str] + return '' diff --git a/kafka_consumer/datadog_checks/kafka_consumer/new_kafka_consumer.py b/kafka_consumer/datadog_checks/kafka_consumer/new_kafka_consumer.py index 02345ac0de0a3..aaf7c9ec42a9a 100644 --- a/kafka_consumer/datadog_checks/kafka_consumer/new_kafka_consumer.py +++ b/kafka_consumer/datadog_checks/kafka_consumer/new_kafka_consumer.py @@ -11,11 +11,12 @@ from datadog_checks.base import AgentCheck, ConfigurationError +from datadog_checks.kafka_consumer.datadog_agent import read_persistent_cache, write_persistent_cache + from .constants import BROKER_REQUESTS_BATCH_SIZE, KAFKA_INTERNAL_TOPICS from six.moves import cPickle as pickle MAX_TIMESTAMPS = 1000 -BROKER_TIMESTAMPS_FILENAME = '/tmp/broker_timestamps' class NewKafkaConsumerCheck(object): """ @@ -28,7 +29,7 @@ def __init__(self, parent_check): self._parent_check = parent_check self._broker_requests_batch_size = self.instance.get('broker_requests_batch_size', BROKER_REQUESTS_BATCH_SIZE) self._kafka_client = None - self._broker_timestamps_filename = BROKER_TIMESTAMPS_FILENAME + self.instance.get('kafka_connect_str') + self._broker_timestamp_cache_key = 'broker_timestamps' + self.instance.get('kafka_connect_str', "") def __getattr__(self, item): try: @@ -98,27 +99,15 @@ def check(self): self._collect_broker_metadata() def _load_broker_timestamps(self): - """Loads broker timestamps from disk.""" + """Loads broker timestamps from persistant cache.""" self._broker_timestamps = defaultdict(dict) try: - with open(self._broker_timestamps_filename, 'rb') as f: - self._broker_timestamps.update(pickle.load(f)) # use update since defaultdict does not pickle + self._broker_timestamps.update(pickle.loads(read_persistent_cache(self._broker_timestamp_cache_key))) # use update since defaultdict does not pickle except Exception: - # file may be corrupted from agent restart during writing - # remove the file, it will be created again further down - self.log.warning('Could not read broker timestamps on disk. Removing file...') - try: - os.remove(self._broker_timestamps_filename) - except OSError: - self.log.exception('Error removing broker timestamps file %s', self._broker_timestamps_filename) - pass + self.log.warning('Could not read broker timestamps from cache') def _save_broker_timestamps(self): - try: - with open(self._broker_timestamps_filename, 'wb') as f: - pickle.dump(self._broker_timestamps, f) - except Exception: - self.log.exception('Could not write the broker timestamps on the disk') + write_persistent_cache(self._broker_timestamp_cache_key, pickle.dumps(self._broker_timestamps)) def _create_kafka_admin_client(self, api_version): """Return a KafkaAdminClient.""" diff --git a/kafka_consumer/tests/test_kafka_consumer.py b/kafka_consumer/tests/test_kafka_consumer.py index 33d8a13ec8952..b3afe5669c973 100644 --- a/kafka_consumer/tests/test_kafka_consumer.py +++ b/kafka_consumer/tests/test_kafka_consumer.py @@ -6,6 +6,8 @@ import mock import pytest +import pickle +from collections import defaultdict from datadog_checks.kafka_consumer import KafkaCheck from datadog_checks.kafka_consumer.legacy_0_10_2 import LegacyKafkaCheck_0_10_2 @@ -20,9 +22,17 @@ BROKER_METRICS = ['kafka.broker_offset'] -CONSUMER_METRICS = ['kafka.consumer_offset', 'kafka.consumer_lag', "kafka.consumer_lag_seconds"] +CONSUMER_METRICS = ['kafka.consumer_offset', 'kafka.consumer_lag'] +if not is_legacy_check(): + CONSUMER_METRICS.append("kafka.consumer_lag_seconds") +def mocked_read_persistent_cache(cache_key): + cached_offsets = defaultdict(dict) + cached_offsets[("marvel", 0)][25] = 150 + cached_offsets[("marvel", 0)][45] = 250 + return pickle.dumps(cached_offsets) + @pytest.mark.unit def test_uses_legacy_implementation_when_legacy_version_specified(kafka_instance): instance = copy.deepcopy(kafka_instance) @@ -43,6 +53,20 @@ def test_uses_new_implementation_when_new_version_specified(kafka_instance): assert isinstance(kafka_consumer_check.sub_check, NewKafkaConsumerCheck) +@pytest.mark.unit +def test_get_interpolated_timestamp(kafka_instance): + instance = copy.deepcopy(kafka_instance) + instance['kafka_client_api_version'] = '0.10.2' + instance['sasl_kerberos_service_name'] = 'kafka' + check = KafkaCheck('kafka_consumer', {}, [instance]) + check._init_check_based_on_kafka_version() + # at offset 0, time is 100s, at offset 10, time is 200sec. + # by interpolation, at offset 5, time should be 150sec. + assert check.sub_check._get_interpolated_timestamp({0: 100, 10: 200}, 5) == 150 + assert check.sub_check._get_interpolated_timestamp({10: 100, 20: 200}, 5) == 50 + assert check.sub_check._get_interpolated_timestamp({0: 100, 10: 200}, 15) == 250 + assert check.sub_check._get_interpolated_timestamp({10: 200}, 15) is None + @pytest.mark.unit def test_gssapi(kafka_instance, dd_run_check): instance = copy.deepcopy(kafka_instance) @@ -101,13 +125,13 @@ def test_tls_config_legacy(extra_config, expected_http_kwargs, kafka_instance): @pytest.mark.integration @pytest.mark.usefixtures('dd_environment') +@mock.patch('datadog_checks.kafka_consumer.new_kafka_consumer.read_persistent_cache', mocked_read_persistent_cache) def test_check_kafka(aggregator, kafka_instance, dd_run_check): """ Testing Kafka_consumer check. """ kafka_consumer_check = KafkaCheck('kafka_consumer', {}, [kafka_instance]) dd_run_check(kafka_consumer_check) - assert_check_kafka(aggregator, kafka_instance['consumer_groups']) @@ -168,6 +192,7 @@ def test_consumer_config_error(caplog, dd_run_check): @pytest.mark.skipif(is_legacy_check(), reason="This test does not apply to the legacy check.") @pytest.mark.integration @pytest.mark.usefixtures('dd_environment') +@mock.patch('datadog_checks.kafka_consumer.new_kafka_consumer.read_persistent_cache', mocked_read_persistent_cache) def test_no_topics(aggregator, kafka_instance, dd_run_check): kafka_instance['consumer_groups'] = {'my_consumer': {}} kafka_consumer_check = KafkaCheck('kafka_consumer', {}, [kafka_instance]) @@ -178,6 +203,7 @@ def test_no_topics(aggregator, kafka_instance, dd_run_check): @pytest.mark.integration @pytest.mark.usefixtures('dd_environment') +@mock.patch('datadog_checks.kafka_consumer.new_kafka_consumer.read_persistent_cache', mocked_read_persistent_cache) def test_no_partitions(aggregator, kafka_instance, dd_run_check): kafka_instance['consumer_groups'] = {'my_consumer': {'marvel': []}} kafka_consumer_check = KafkaCheck('kafka_consumer', {}, [kafka_instance]) From ddb0dd9b6f9108876aa719af8773ad31a89e1829 Mon Sep 17 00:00:00 2001 From: Piotr Wolski Date: Tue, 3 May 2022 10:17:25 +0200 Subject: [PATCH 03/14] kafka_consumer: go back to python 3.8 --- .../kafka_consumer/new_kafka_consumer.py | 14 ++++++++------ kafka_consumer/tests/test_kafka_consumer.py | 6 ++++-- kafka_consumer/tox.ini | 8 ++++---- 3 files changed, 16 insertions(+), 12 deletions(-) diff --git a/kafka_consumer/datadog_checks/kafka_consumer/new_kafka_consumer.py b/kafka_consumer/datadog_checks/kafka_consumer/new_kafka_consumer.py index aaf7c9ec42a9a..de79f1ef44f7b 100644 --- a/kafka_consumer/datadog_checks/kafka_consumer/new_kafka_consumer.py +++ b/kafka_consumer/datadog_checks/kafka_consumer/new_kafka_consumer.py @@ -3,21 +3,20 @@ # Licensed under Simplified BSD License (see LICENSE) from collections import defaultdict from time import time -import os from kafka import errors as kafka_errors from kafka.protocol.offset import OffsetRequest, OffsetResetStrategy, OffsetResponse from kafka.structs import TopicPartition +from six.moves import cPickle as pickle from datadog_checks.base import AgentCheck, ConfigurationError - from datadog_checks.kafka_consumer.datadog_agent import read_persistent_cache, write_persistent_cache from .constants import BROKER_REQUESTS_BATCH_SIZE, KAFKA_INTERNAL_TOPICS -from six.moves import cPickle as pickle MAX_TIMESTAMPS = 1000 + class NewKafkaConsumerCheck(object): """ Check the offsets and lag of Kafka consumers. This check also returns broker highwater offsets. @@ -102,7 +101,9 @@ def _load_broker_timestamps(self): """Loads broker timestamps from persistant cache.""" self._broker_timestamps = defaultdict(dict) try: - self._broker_timestamps.update(pickle.loads(read_persistent_cache(self._broker_timestamp_cache_key))) # use update since defaultdict does not pickle + self._broker_timestamps.update( + pickle.loads(read_persistent_cache(self._broker_timestamp_cache_key)) + ) # use update since defaultdict does not pickle except Exception: self.log.warning('Could not read broker timestamps from cache') @@ -280,9 +281,10 @@ def _report_consumer_offsets_and_lag(self, contexts_limit): if reported_contexts >= contexts_limit: continue timestamps = self._broker_timestamps[(topic, partition)] + # producer_timestamp is set in the same check, so it should never be None producer_timestamp = timestamps[producer_offset] consumer_timestamp = self._get_interpolated_timestamp(timestamps, consumer_offset) - if consumer_timestamp is None: + if consumer_timestamp is None or producer_timestamp is None: continue lag = consumer_timestamp - producer_timestamp self.gauge('consumer_lag_seconds', lag, tags=consumer_group_tags) @@ -303,7 +305,7 @@ def _report_consumer_offsets_and_lag(self, contexts_limit): def _get_interpolated_timestamp(self, timestamps, offset): if offset in timestamps: - return timestamps[offset], True + return timestamps[offset] offsets = timestamps.keys() try: # Get the most close saved offsets to the consumer_offset diff --git a/kafka_consumer/tests/test_kafka_consumer.py b/kafka_consumer/tests/test_kafka_consumer.py index b3afe5669c973..25fd8a310d3db 100644 --- a/kafka_consumer/tests/test_kafka_consumer.py +++ b/kafka_consumer/tests/test_kafka_consumer.py @@ -3,11 +3,11 @@ # Licensed under Simplified BSD License (see LICENSE) import copy import os +import pickle +from collections import defaultdict import mock import pytest -import pickle -from collections import defaultdict from datadog_checks.kafka_consumer import KafkaCheck from datadog_checks.kafka_consumer.legacy_0_10_2 import LegacyKafkaCheck_0_10_2 @@ -33,6 +33,7 @@ def mocked_read_persistent_cache(cache_key): cached_offsets[("marvel", 0)][45] = 250 return pickle.dumps(cached_offsets) + @pytest.mark.unit def test_uses_legacy_implementation_when_legacy_version_specified(kafka_instance): instance = copy.deepcopy(kafka_instance) @@ -67,6 +68,7 @@ def test_get_interpolated_timestamp(kafka_instance): assert check.sub_check._get_interpolated_timestamp({0: 100, 10: 200}, 15) == 250 assert check.sub_check._get_interpolated_timestamp({10: 200}, 15) is None + @pytest.mark.unit def test_gssapi(kafka_instance, dd_run_check): instance = copy.deepcopy(kafka_instance) diff --git a/kafka_consumer/tox.ini b/kafka_consumer/tox.ini index ef954f1575498..32673ab9a0074 100644 --- a/kafka_consumer/tox.ini +++ b/kafka_consumer/tox.ini @@ -1,18 +1,18 @@ [tox] isolated_build = true minversion = 2.0 -basepython = py39 +basepython = py38 envlist = py27-{0.9,latest}-{kafka,zk} - py39-{0.9,0.11,1.1,2.3,latest}-{kafka,zk} + py38-{0.9,0.11,1.1,2.3,latest}-{kafka,zk} [testenv] ensure_default_envdir = true envdir = py27: {toxworkdir}/py27 - py39: {toxworkdir}/py39 + py38: {toxworkdir}/py38 description = - py{27,39}-{0.9,latest}-{kafka,zk}: e2e ready + py{27,38}-{0.9,latest}-{kafka,zk}: e2e ready dd_check_style = true usedevelop = true platform = linux|darwin|win32 From 5f3786f7d5fc59f9ebfe1d858937291f62f514a9 Mon Sep 17 00:00:00 2001 From: Piotr Wolski Date: Tue, 3 May 2022 15:18:56 +0200 Subject: [PATCH 04/14] don't use mock.patch decorator (not available in python 2.7 --- kafka_consumer/tests/test_kafka_consumer.py | 34 ++++++++++++--------- 1 file changed, 20 insertions(+), 14 deletions(-) diff --git a/kafka_consumer/tests/test_kafka_consumer.py b/kafka_consumer/tests/test_kafka_consumer.py index 25fd8a310d3db..cd46dcc064f93 100644 --- a/kafka_consumer/tests/test_kafka_consumer.py +++ b/kafka_consumer/tests/test_kafka_consumer.py @@ -127,14 +127,16 @@ def test_tls_config_legacy(extra_config, expected_http_kwargs, kafka_instance): @pytest.mark.integration @pytest.mark.usefixtures('dd_environment') -@mock.patch('datadog_checks.kafka_consumer.new_kafka_consumer.read_persistent_cache', mocked_read_persistent_cache) def test_check_kafka(aggregator, kafka_instance, dd_run_check): """ Testing Kafka_consumer check. """ - kafka_consumer_check = KafkaCheck('kafka_consumer', {}, [kafka_instance]) - dd_run_check(kafka_consumer_check) - assert_check_kafka(aggregator, kafka_instance['consumer_groups']) + with mock.patch( + 'datadog_checks.kafka_consumer.new_kafka_consumer.read_persistent_cache', mocked_read_persistent_cache + ): + kafka_consumer_check = KafkaCheck('kafka_consumer', {}, [kafka_instance]) + dd_run_check(kafka_consumer_check) + assert_check_kafka(aggregator, kafka_instance['consumer_groups']) @pytest.mark.integration @@ -194,24 +196,28 @@ def test_consumer_config_error(caplog, dd_run_check): @pytest.mark.skipif(is_legacy_check(), reason="This test does not apply to the legacy check.") @pytest.mark.integration @pytest.mark.usefixtures('dd_environment') -@mock.patch('datadog_checks.kafka_consumer.new_kafka_consumer.read_persistent_cache', mocked_read_persistent_cache) def test_no_topics(aggregator, kafka_instance, dd_run_check): - kafka_instance['consumer_groups'] = {'my_consumer': {}} - kafka_consumer_check = KafkaCheck('kafka_consumer', {}, [kafka_instance]) - dd_run_check(kafka_consumer_check) + with mock.patch( + 'datadog_checks.kafka_consumer.new_kafka_consumer.read_persistent_cache', mocked_read_persistent_cache + ): + kafka_instance['consumer_groups'] = {'my_consumer': {}} + kafka_consumer_check = KafkaCheck('kafka_consumer', {}, [kafka_instance]) + dd_run_check(kafka_consumer_check) - assert_check_kafka(aggregator, {'my_consumer': {'marvel': [0]}}) + assert_check_kafka(aggregator, {'my_consumer': {'marvel': [0]}}) @pytest.mark.integration @pytest.mark.usefixtures('dd_environment') -@mock.patch('datadog_checks.kafka_consumer.new_kafka_consumer.read_persistent_cache', mocked_read_persistent_cache) def test_no_partitions(aggregator, kafka_instance, dd_run_check): - kafka_instance['consumer_groups'] = {'my_consumer': {'marvel': []}} - kafka_consumer_check = KafkaCheck('kafka_consumer', {}, [kafka_instance]) - dd_run_check(kafka_consumer_check) + with mock.patch( + 'datadog_checks.kafka_consumer.new_kafka_consumer.read_persistent_cache', mocked_read_persistent_cache + ): + kafka_instance['consumer_groups'] = {'my_consumer': {'marvel': []}} + kafka_consumer_check = KafkaCheck('kafka_consumer', {}, [kafka_instance]) + dd_run_check(kafka_consumer_check) - assert_check_kafka(aggregator, {'my_consumer': {'marvel': [0]}}) + assert_check_kafka(aggregator, {'my_consumer': {'marvel': [0]}}) @pytest.mark.skipif(os.environ.get('KAFKA_VERSION', '').startswith('0.9'), reason='Old Kafka version') From aadab87ccd579f4c3271f7904b762445566f0719 Mon Sep 17 00:00:00 2001 From: Piotr Wolski Date: Tue, 3 May 2022 16:42:09 +0200 Subject: [PATCH 05/14] add metric to metadata file --- kafka_consumer/metadata.csv | 1 + 1 file changed, 1 insertion(+) diff --git a/kafka_consumer/metadata.csv b/kafka_consumer/metadata.csv index 11ec6aec16bbb..e7f547d2bf581 100644 --- a/kafka_consumer/metadata.csv +++ b/kafka_consumer/metadata.csv @@ -2,3 +2,4 @@ metric_name,metric_type,interval,unit_name,per_unit_name,description,orientation kafka.broker_offset,gauge,,offset,,Current message offset on broker.,0,kafka,broker offset, kafka.consumer_lag,gauge,,offset,,Lag in messages between consumer and broker.,-1,kafka,consumer lag, kafka.consumer_offset,gauge,,offset,,Current message offset on consumer.,0,kafka,consumer offset, +kafka.consumer_lag_seconds,gauge,,seconds,,Lag in seconds between consumer and broker.,-1,kafka,consumer time lag, From 6380152d183d2cd56244bcaa122b71025cdec077 Mon Sep 17 00:00:00 2001 From: Piotr Wolski Date: Wed, 4 May 2022 11:23:11 +0200 Subject: [PATCH 06/14] fix lag in seconds metric --- .../kafka_consumer/new_kafka_consumer.py | 22 +++---- kafka_consumer/metadata.csv | 2 +- kafka_consumer/tests/test_kafka_consumer.py | 58 ++++++++++--------- 3 files changed, 44 insertions(+), 38 deletions(-) diff --git a/kafka_consumer/datadog_checks/kafka_consumer/new_kafka_consumer.py b/kafka_consumer/datadog_checks/kafka_consumer/new_kafka_consumer.py index de79f1ef44f7b..99853af926ac8 100644 --- a/kafka_consumer/datadog_checks/kafka_consumer/new_kafka_consumer.py +++ b/kafka_consumer/datadog_checks/kafka_consumer/new_kafka_consumer.py @@ -1,13 +1,13 @@ # (C) Datadog, Inc. 2019-present # All rights reserved # Licensed under Simplified BSD License (see LICENSE) +import json from collections import defaultdict from time import time from kafka import errors as kafka_errors from kafka.protocol.offset import OffsetRequest, OffsetResetStrategy, OffsetResponse from kafka.structs import TopicPartition -from six.moves import cPickle as pickle from datadog_checks.base import AgentCheck, ConfigurationError from datadog_checks.kafka_consumer.datadog_agent import read_persistent_cache, write_persistent_cache @@ -98,17 +98,17 @@ def check(self): self._collect_broker_metadata() def _load_broker_timestamps(self): - """Loads broker timestamps from persistant cache.""" + """Loads broker timestamps from persistent cache.""" self._broker_timestamps = defaultdict(dict) try: - self._broker_timestamps.update( - pickle.loads(read_persistent_cache(self._broker_timestamp_cache_key)) - ) # use update since defaultdict does not pickle - except Exception: - self.log.warning('Could not read broker timestamps from cache') + for topic_partition, content in json.loads(read_persistent_cache(self._broker_timestamp_cache_key)).items(): + for offset, timestamp in content.items(): + self._broker_timestamps[topic_partition][int(offset)] = timestamp + except Exception as e: + self.log.warning('Could not read broker timestamps from cache' + str(e)) def _save_broker_timestamps(self): - write_persistent_cache(self._broker_timestamp_cache_key, pickle.dumps(self._broker_timestamps)) + write_persistent_cache(self._broker_timestamp_cache_key, json.dumps(self._broker_timestamps)) def _create_kafka_admin_client(self, api_version): """Return a KafkaAdminClient.""" @@ -192,7 +192,7 @@ def _highwater_offsets_callback(self, response): error_type = kafka_errors.for_code(error_code) if error_type is kafka_errors.NoError: self._highwater_offsets[(topic, partition)] = offsets[0] - timestamps = self._broker_timestamps[(topic, partition)] + timestamps = self._broker_timestamps["{}_{}".format(topic, partition)] timestamps[offsets[0]] = time() # If there's too many timestamps, we delete the oldest if len(timestamps) > MAX_TIMESTAMPS: @@ -280,13 +280,13 @@ def _report_consumer_offsets_and_lag(self, contexts_limit): if reported_contexts >= contexts_limit: continue - timestamps = self._broker_timestamps[(topic, partition)] + timestamps = self._broker_timestamps["{}_{}".format(topic, partition)] # producer_timestamp is set in the same check, so it should never be None producer_timestamp = timestamps[producer_offset] consumer_timestamp = self._get_interpolated_timestamp(timestamps, consumer_offset) if consumer_timestamp is None or producer_timestamp is None: continue - lag = consumer_timestamp - producer_timestamp + lag = producer_timestamp - consumer_timestamp self.gauge('consumer_lag_seconds', lag, tags=consumer_group_tags) reported_contexts += 1 else: diff --git a/kafka_consumer/metadata.csv b/kafka_consumer/metadata.csv index e7f547d2bf581..2461c74b314cb 100644 --- a/kafka_consumer/metadata.csv +++ b/kafka_consumer/metadata.csv @@ -2,4 +2,4 @@ metric_name,metric_type,interval,unit_name,per_unit_name,description,orientation kafka.broker_offset,gauge,,offset,,Current message offset on broker.,0,kafka,broker offset, kafka.consumer_lag,gauge,,offset,,Lag in messages between consumer and broker.,-1,kafka,consumer lag, kafka.consumer_offset,gauge,,offset,,Current message offset on consumer.,0,kafka,consumer offset, -kafka.consumer_lag_seconds,gauge,,seconds,,Lag in seconds between consumer and broker.,-1,kafka,consumer time lag, +kafka.consumer_lag_seconds,gauge,,second,,Lag in seconds between consumer and broker.,-1,kafka,consumer time lag, diff --git a/kafka_consumer/tests/test_kafka_consumer.py b/kafka_consumer/tests/test_kafka_consumer.py index cd46dcc064f93..b2197daf3d202 100644 --- a/kafka_consumer/tests/test_kafka_consumer.py +++ b/kafka_consumer/tests/test_kafka_consumer.py @@ -2,8 +2,8 @@ # All rights reserved # Licensed under Simplified BSD License (see LICENSE) import copy +import json import os -import pickle from collections import defaultdict import mock @@ -23,15 +23,20 @@ BROKER_METRICS = ['kafka.broker_offset'] CONSUMER_METRICS = ['kafka.consumer_offset', 'kafka.consumer_lag'] -if not is_legacy_check(): - CONSUMER_METRICS.append("kafka.consumer_lag_seconds") def mocked_read_persistent_cache(cache_key): cached_offsets = defaultdict(dict) - cached_offsets[("marvel", 0)][25] = 150 - cached_offsets[("marvel", 0)][45] = 250 - return pickle.dumps(cached_offsets) + cached_offsets["marvel_0"][25] = 150 + cached_offsets["marvel_0"][40] = 200 + return json.dumps(cached_offsets) + + +def mocked_time(): + # broker offset 80 will be set to timestamp 400. + # knowing that from the cache, offset 40 is set at 200, and that the consumer is at offset 60, + # the timestamp of the consumer will be at 300. So time lag is 400-300=100seconds. + return 400 @pytest.mark.unit @@ -127,16 +132,15 @@ def test_tls_config_legacy(extra_config, expected_http_kwargs, kafka_instance): @pytest.mark.integration @pytest.mark.usefixtures('dd_environment') +@mock.patch('datadog_checks.kafka_consumer.new_kafka_consumer.read_persistent_cache', mocked_read_persistent_cache) +@mock.patch('datadog_checks.kafka_consumer.new_kafka_consumer.time', mocked_time) def test_check_kafka(aggregator, kafka_instance, dd_run_check): """ Testing Kafka_consumer check. """ - with mock.patch( - 'datadog_checks.kafka_consumer.new_kafka_consumer.read_persistent_cache', mocked_read_persistent_cache - ): - kafka_consumer_check = KafkaCheck('kafka_consumer', {}, [kafka_instance]) - dd_run_check(kafka_consumer_check) - assert_check_kafka(aggregator, kafka_instance['consumer_groups']) + kafka_consumer_check = KafkaCheck('kafka_consumer', {}, [kafka_instance]) + dd_run_check(kafka_consumer_check) + assert_check_kafka(aggregator, kafka_instance['consumer_groups']) @pytest.mark.integration @@ -178,6 +182,10 @@ def assert_check_kafka(aggregator, consumer_groups): aggregator.assert_metric(mname, tags=tags, at_least=1) for mname in CONSUMER_METRICS: aggregator.assert_metric(mname, tags=tags + ["consumer_group:{}".format(name)], at_least=1) + if not is_legacy_check(): + aggregator.assert_metric( + "kafka.consumer_lag_seconds", tags=tags + ["consumer_group:{}".format(name)], count=1, value=100 + ) aggregator.assert_all_metrics_covered() @@ -196,28 +204,26 @@ def test_consumer_config_error(caplog, dd_run_check): @pytest.mark.skipif(is_legacy_check(), reason="This test does not apply to the legacy check.") @pytest.mark.integration @pytest.mark.usefixtures('dd_environment') +@mock.patch('datadog_checks.kafka_consumer.new_kafka_consumer.read_persistent_cache', mocked_read_persistent_cache) +@mock.patch('datadog_checks.kafka_consumer.new_kafka_consumer.time', mocked_time) def test_no_topics(aggregator, kafka_instance, dd_run_check): - with mock.patch( - 'datadog_checks.kafka_consumer.new_kafka_consumer.read_persistent_cache', mocked_read_persistent_cache - ): - kafka_instance['consumer_groups'] = {'my_consumer': {}} - kafka_consumer_check = KafkaCheck('kafka_consumer', {}, [kafka_instance]) - dd_run_check(kafka_consumer_check) + kafka_instance['consumer_groups'] = {'my_consumer': {}} + kafka_consumer_check = KafkaCheck('kafka_consumer', {}, [kafka_instance]) + dd_run_check(kafka_consumer_check) - assert_check_kafka(aggregator, {'my_consumer': {'marvel': [0]}}) + assert_check_kafka(aggregator, {'my_consumer': {'marvel': [0]}}) @pytest.mark.integration @pytest.mark.usefixtures('dd_environment') +@mock.patch('datadog_checks.kafka_consumer.new_kafka_consumer.read_persistent_cache', mocked_read_persistent_cache) +@mock.patch('datadog_checks.kafka_consumer.new_kafka_consumer.time', mocked_time) def test_no_partitions(aggregator, kafka_instance, dd_run_check): - with mock.patch( - 'datadog_checks.kafka_consumer.new_kafka_consumer.read_persistent_cache', mocked_read_persistent_cache - ): - kafka_instance['consumer_groups'] = {'my_consumer': {'marvel': []}} - kafka_consumer_check = KafkaCheck('kafka_consumer', {}, [kafka_instance]) - dd_run_check(kafka_consumer_check) + kafka_instance['consumer_groups'] = {'my_consumer': {'marvel': []}} + kafka_consumer_check = KafkaCheck('kafka_consumer', {}, [kafka_instance]) + dd_run_check(kafka_consumer_check) - assert_check_kafka(aggregator, {'my_consumer': {'marvel': [0]}}) + assert_check_kafka(aggregator, {'my_consumer': {'marvel': [0]}}) @pytest.mark.skipif(os.environ.get('KAFKA_VERSION', '').startswith('0.9'), reason='Old Kafka version') From d70c71ae2e248af8ca338c71f0dd12141342f053 Mon Sep 17 00:00:00 2001 From: Piotr Wolski Date: Wed, 4 May 2022 11:57:18 +0200 Subject: [PATCH 07/14] offset isn't 60 all the time --- kafka_consumer/tests/test_kafka_consumer.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/kafka_consumer/tests/test_kafka_consumer.py b/kafka_consumer/tests/test_kafka_consumer.py index b2197daf3d202..9519aadb1768b 100644 --- a/kafka_consumer/tests/test_kafka_consumer.py +++ b/kafka_consumer/tests/test_kafka_consumer.py @@ -33,9 +33,6 @@ def mocked_read_persistent_cache(cache_key): def mocked_time(): - # broker offset 80 will be set to timestamp 400. - # knowing that from the cache, offset 40 is set at 200, and that the consumer is at offset 60, - # the timestamp of the consumer will be at 300. So time lag is 400-300=100seconds. return 400 @@ -184,7 +181,7 @@ def assert_check_kafka(aggregator, consumer_groups): aggregator.assert_metric(mname, tags=tags + ["consumer_group:{}".format(name)], at_least=1) if not is_legacy_check(): aggregator.assert_metric( - "kafka.consumer_lag_seconds", tags=tags + ["consumer_group:{}".format(name)], count=1, value=100 + "kafka.consumer_lag_seconds", tags=tags + ["consumer_group:{}".format(name)], at_least=1 ) aggregator.assert_all_metrics_covered() From 80ccedc544de7f645b279ee498a26e816ff624b0 Mon Sep 17 00:00:00 2001 From: Piotr Wolski Date: Wed, 4 May 2022 11:58:21 +0200 Subject: [PATCH 08/14] formatting code --- .../datadog_checks/kafka_consumer/new_kafka_consumer.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/kafka_consumer/datadog_checks/kafka_consumer/new_kafka_consumer.py b/kafka_consumer/datadog_checks/kafka_consumer/new_kafka_consumer.py index 99853af926ac8..32409e298a08a 100644 --- a/kafka_consumer/datadog_checks/kafka_consumer/new_kafka_consumer.py +++ b/kafka_consumer/datadog_checks/kafka_consumer/new_kafka_consumer.py @@ -104,8 +104,8 @@ def _load_broker_timestamps(self): for topic_partition, content in json.loads(read_persistent_cache(self._broker_timestamp_cache_key)).items(): for offset, timestamp in content.items(): self._broker_timestamps[topic_partition][int(offset)] = timestamp - except Exception as e: - self.log.warning('Could not read broker timestamps from cache' + str(e)) + except Exception: + self.log.warning('Could not read broker timestamps from cache') def _save_broker_timestamps(self): write_persistent_cache(self._broker_timestamp_cache_key, json.dumps(self._broker_timestamps)) From 251c6578d7bbea43f3fbeb2c4be4f89003eaa8e7 Mon Sep 17 00:00:00 2001 From: Piotr Wolski Date: Wed, 4 May 2022 17:01:55 +0200 Subject: [PATCH 09/14] fix e2e test --- kafka_consumer/tests/test_kafka_consumer.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/kafka_consumer/tests/test_kafka_consumer.py b/kafka_consumer/tests/test_kafka_consumer.py index 9519aadb1768b..a415638673d0f 100644 --- a/kafka_consumer/tests/test_kafka_consumer.py +++ b/kafka_consumer/tests/test_kafka_consumer.py @@ -164,6 +164,8 @@ def test_check_kafka_metrics_limit(aggregator, kafka_instance, dd_run_check): @pytest.mark.e2e +@mock.patch('datadog_checks.kafka_consumer.new_kafka_consumer.read_persistent_cache', mocked_read_persistent_cache) +@mock.patch('datadog_checks.kafka_consumer.new_kafka_consumer.time', mocked_time) def test_e2e(dd_agent_check, kafka_instance): aggregator = dd_agent_check(kafka_instance) From 5a68369d83c161ab3800bdf7cc3c047204ba3953 Mon Sep 17 00:00:00 2001 From: Piotr Wolski Date: Thu, 5 May 2022 11:24:27 +0200 Subject: [PATCH 10/14] don't use connection string as cache key --- .../datadog_checks/kafka_consumer/new_kafka_consumer.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kafka_consumer/datadog_checks/kafka_consumer/new_kafka_consumer.py b/kafka_consumer/datadog_checks/kafka_consumer/new_kafka_consumer.py index 32409e298a08a..e40ca807f2d10 100644 --- a/kafka_consumer/datadog_checks/kafka_consumer/new_kafka_consumer.py +++ b/kafka_consumer/datadog_checks/kafka_consumer/new_kafka_consumer.py @@ -28,7 +28,7 @@ def __init__(self, parent_check): self._parent_check = parent_check self._broker_requests_batch_size = self.instance.get('broker_requests_batch_size', BROKER_REQUESTS_BATCH_SIZE) self._kafka_client = None - self._broker_timestamp_cache_key = 'broker_timestamps' + self.instance.get('kafka_connect_str', "") + self._broker_timestamp_cache_key = 'broker_timestamps' + "".join(sorted(self._custom_tags)) def __getattr__(self, item): try: From 59ba943ec58c271683b2e9ed368cf29251a3283d Mon Sep 17 00:00:00 2001 From: Piotr Wolski Date: Thu, 5 May 2022 15:49:13 +0200 Subject: [PATCH 11/14] don't mock for e2e --- kafka_consumer/tests/test_kafka_consumer.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/kafka_consumer/tests/test_kafka_consumer.py b/kafka_consumer/tests/test_kafka_consumer.py index a415638673d0f..d0f07e9bc1e7f 100644 --- a/kafka_consumer/tests/test_kafka_consumer.py +++ b/kafka_consumer/tests/test_kafka_consumer.py @@ -10,6 +10,7 @@ import pytest from datadog_checks.kafka_consumer import KafkaCheck +from datadog_checks.kafka_consumer.datadog_agent import write_persistent_cache from datadog_checks.kafka_consumer.legacy_0_10_2 import LegacyKafkaCheck_0_10_2 from datadog_checks.kafka_consumer.new_kafka_consumer import NewKafkaConsumerCheck @@ -164,9 +165,8 @@ def test_check_kafka_metrics_limit(aggregator, kafka_instance, dd_run_check): @pytest.mark.e2e -@mock.patch('datadog_checks.kafka_consumer.new_kafka_consumer.read_persistent_cache', mocked_read_persistent_cache) -@mock.patch('datadog_checks.kafka_consumer.new_kafka_consumer.time', mocked_time) def test_e2e(dd_agent_check, kafka_instance): + write_persistent_cache("broker_timestampsoptional:tag1", mocked_read_persistent_cache("")) aggregator = dd_agent_check(kafka_instance) assert_check_kafka(aggregator, kafka_instance['consumer_groups']) From a4c779f4996e20ecaf43930860e4952b76ee78f1 Mon Sep 17 00:00:00 2001 From: Piotr Wolski Date: Fri, 6 May 2022 01:07:54 +0200 Subject: [PATCH 12/14] e2e test doesn't emit lag in seconds --- kafka_consumer/tests/test_kafka_consumer.py | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/kafka_consumer/tests/test_kafka_consumer.py b/kafka_consumer/tests/test_kafka_consumer.py index d0f07e9bc1e7f..6573e1418f06b 100644 --- a/kafka_consumer/tests/test_kafka_consumer.py +++ b/kafka_consumer/tests/test_kafka_consumer.py @@ -166,13 +166,11 @@ def test_check_kafka_metrics_limit(aggregator, kafka_instance, dd_run_check): @pytest.mark.e2e def test_e2e(dd_agent_check, kafka_instance): - write_persistent_cache("broker_timestampsoptional:tag1", mocked_read_persistent_cache("")) aggregator = dd_agent_check(kafka_instance) - - assert_check_kafka(aggregator, kafka_instance['consumer_groups']) + assert_check_kafka(aggregator, kafka_instance['consumer_groups'], e2e=True) -def assert_check_kafka(aggregator, consumer_groups): +def assert_check_kafka(aggregator, consumer_groups, e2e=False): for name, consumer_group in consumer_groups.items(): for topic, partitions in consumer_group.items(): for partition in partitions: @@ -181,7 +179,10 @@ def assert_check_kafka(aggregator, consumer_groups): aggregator.assert_metric(mname, tags=tags, at_least=1) for mname in CONSUMER_METRICS: aggregator.assert_metric(mname, tags=tags + ["consumer_group:{}".format(name)], at_least=1) - if not is_legacy_check(): + if not is_legacy_check() and not e2e: + # in the e2e test, Kafka is not actively receiving data. So we never populate the broker + # timestamps with more than one timestamp. So we can't interpolate to get the consumer + # timestamp. aggregator.assert_metric( "kafka.consumer_lag_seconds", tags=tags + ["consumer_group:{}".format(name)], at_least=1 ) From bc57a7e00ba64a4f71d4db3748d8f2f7b49f68eb Mon Sep 17 00:00:00 2001 From: Piotr Wolski Date: Fri, 6 May 2022 12:16:23 +0200 Subject: [PATCH 13/14] style --- kafka_consumer/tests/test_kafka_consumer.py | 1 - 1 file changed, 1 deletion(-) diff --git a/kafka_consumer/tests/test_kafka_consumer.py b/kafka_consumer/tests/test_kafka_consumer.py index 6573e1418f06b..b2f87706a9cf0 100644 --- a/kafka_consumer/tests/test_kafka_consumer.py +++ b/kafka_consumer/tests/test_kafka_consumer.py @@ -10,7 +10,6 @@ import pytest from datadog_checks.kafka_consumer import KafkaCheck -from datadog_checks.kafka_consumer.datadog_agent import write_persistent_cache from datadog_checks.kafka_consumer.legacy_0_10_2 import LegacyKafkaCheck_0_10_2 from datadog_checks.kafka_consumer.new_kafka_consumer import NewKafkaConsumerCheck From e2f26db91e64055035a3e9fcb0531e545b97359a Mon Sep 17 00:00:00 2001 From: Piotr Wolski Date: Tue, 10 May 2022 14:06:22 +0200 Subject: [PATCH 14/14] include exception in message --- .../datadog_checks/kafka_consumer/new_kafka_consumer.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/kafka_consumer/datadog_checks/kafka_consumer/new_kafka_consumer.py b/kafka_consumer/datadog_checks/kafka_consumer/new_kafka_consumer.py index e40ca807f2d10..e1ea4ecf8b320 100644 --- a/kafka_consumer/datadog_checks/kafka_consumer/new_kafka_consumer.py +++ b/kafka_consumer/datadog_checks/kafka_consumer/new_kafka_consumer.py @@ -104,8 +104,8 @@ def _load_broker_timestamps(self): for topic_partition, content in json.loads(read_persistent_cache(self._broker_timestamp_cache_key)).items(): for offset, timestamp in content.items(): self._broker_timestamps[topic_partition][int(offset)] = timestamp - except Exception: - self.log.warning('Could not read broker timestamps from cache') + except Exception as e: + self.log.warning('Could not read broker timestamps from cache: %s', str(e)) def _save_broker_timestamps(self): write_persistent_cache(self._broker_timestamp_cache_key, json.dumps(self._broker_timestamps))