From ddb0dd9b6f9108876aa719af8773ad31a89e1829 Mon Sep 17 00:00:00 2001 From: Piotr Wolski Date: Tue, 3 May 2022 10:17:25 +0200 Subject: [PATCH] kafka_consumer: go back to python 3.8 --- .../kafka_consumer/new_kafka_consumer.py | 14 ++++++++------ kafka_consumer/tests/test_kafka_consumer.py | 6 ++++-- kafka_consumer/tox.ini | 8 ++++---- 3 files changed, 16 insertions(+), 12 deletions(-) diff --git a/kafka_consumer/datadog_checks/kafka_consumer/new_kafka_consumer.py b/kafka_consumer/datadog_checks/kafka_consumer/new_kafka_consumer.py index aaf7c9ec42a9a..de79f1ef44f7b 100644 --- a/kafka_consumer/datadog_checks/kafka_consumer/new_kafka_consumer.py +++ b/kafka_consumer/datadog_checks/kafka_consumer/new_kafka_consumer.py @@ -3,21 +3,20 @@ # Licensed under Simplified BSD License (see LICENSE) from collections import defaultdict from time import time -import os from kafka import errors as kafka_errors from kafka.protocol.offset import OffsetRequest, OffsetResetStrategy, OffsetResponse from kafka.structs import TopicPartition +from six.moves import cPickle as pickle from datadog_checks.base import AgentCheck, ConfigurationError - from datadog_checks.kafka_consumer.datadog_agent import read_persistent_cache, write_persistent_cache from .constants import BROKER_REQUESTS_BATCH_SIZE, KAFKA_INTERNAL_TOPICS -from six.moves import cPickle as pickle MAX_TIMESTAMPS = 1000 + class NewKafkaConsumerCheck(object): """ Check the offsets and lag of Kafka consumers. This check also returns broker highwater offsets. @@ -102,7 +101,9 @@ def _load_broker_timestamps(self): """Loads broker timestamps from persistant cache.""" self._broker_timestamps = defaultdict(dict) try: - self._broker_timestamps.update(pickle.loads(read_persistent_cache(self._broker_timestamp_cache_key))) # use update since defaultdict does not pickle + self._broker_timestamps.update( + pickle.loads(read_persistent_cache(self._broker_timestamp_cache_key)) + ) # use update since defaultdict does not pickle except Exception: self.log.warning('Could not read broker timestamps from cache') @@ -280,9 +281,10 @@ def _report_consumer_offsets_and_lag(self, contexts_limit): if reported_contexts >= contexts_limit: continue timestamps = self._broker_timestamps[(topic, partition)] + # producer_timestamp is set in the same check, so it should never be None producer_timestamp = timestamps[producer_offset] consumer_timestamp = self._get_interpolated_timestamp(timestamps, consumer_offset) - if consumer_timestamp is None: + if consumer_timestamp is None or producer_timestamp is None: continue lag = consumer_timestamp - producer_timestamp self.gauge('consumer_lag_seconds', lag, tags=consumer_group_tags) @@ -303,7 +305,7 @@ def _report_consumer_offsets_and_lag(self, contexts_limit): def _get_interpolated_timestamp(self, timestamps, offset): if offset in timestamps: - return timestamps[offset], True + return timestamps[offset] offsets = timestamps.keys() try: # Get the most close saved offsets to the consumer_offset diff --git a/kafka_consumer/tests/test_kafka_consumer.py b/kafka_consumer/tests/test_kafka_consumer.py index b3afe5669c973..25fd8a310d3db 100644 --- a/kafka_consumer/tests/test_kafka_consumer.py +++ b/kafka_consumer/tests/test_kafka_consumer.py @@ -3,11 +3,11 @@ # Licensed under Simplified BSD License (see LICENSE) import copy import os +import pickle +from collections import defaultdict import mock import pytest -import pickle -from collections import defaultdict from datadog_checks.kafka_consumer import KafkaCheck from datadog_checks.kafka_consumer.legacy_0_10_2 import LegacyKafkaCheck_0_10_2 @@ -33,6 +33,7 @@ def mocked_read_persistent_cache(cache_key): cached_offsets[("marvel", 0)][45] = 250 return pickle.dumps(cached_offsets) + @pytest.mark.unit def test_uses_legacy_implementation_when_legacy_version_specified(kafka_instance): instance = copy.deepcopy(kafka_instance) @@ -67,6 +68,7 @@ def test_get_interpolated_timestamp(kafka_instance): assert check.sub_check._get_interpolated_timestamp({0: 100, 10: 200}, 15) == 250 assert check.sub_check._get_interpolated_timestamp({10: 200}, 15) is None + @pytest.mark.unit def test_gssapi(kafka_instance, dd_run_check): instance = copy.deepcopy(kafka_instance) diff --git a/kafka_consumer/tox.ini b/kafka_consumer/tox.ini index ef954f1575498..32673ab9a0074 100644 --- a/kafka_consumer/tox.ini +++ b/kafka_consumer/tox.ini @@ -1,18 +1,18 @@ [tox] isolated_build = true minversion = 2.0 -basepython = py39 +basepython = py38 envlist = py27-{0.9,latest}-{kafka,zk} - py39-{0.9,0.11,1.1,2.3,latest}-{kafka,zk} + py38-{0.9,0.11,1.1,2.3,latest}-{kafka,zk} [testenv] ensure_default_envdir = true envdir = py27: {toxworkdir}/py27 - py39: {toxworkdir}/py39 + py38: {toxworkdir}/py38 description = - py{27,39}-{0.9,latest}-{kafka,zk}: e2e ready + py{27,38}-{0.9,latest}-{kafka,zk}: e2e ready dd_check_style = true usedevelop = true platform = linux|darwin|win32