Skip to content

Commit

Permalink
kafka_consumer: go back to python 3.8
Browse files Browse the repository at this point in the history
  • Loading branch information
piochelepiotr committed May 3, 2022
1 parent 0bf6c28 commit ddb0dd9
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,20 @@
# Licensed under Simplified BSD License (see LICENSE)
from collections import defaultdict
from time import time
import os

from kafka import errors as kafka_errors
from kafka.protocol.offset import OffsetRequest, OffsetResetStrategy, OffsetResponse
from kafka.structs import TopicPartition
from six.moves import cPickle as pickle

from datadog_checks.base import AgentCheck, ConfigurationError

from datadog_checks.kafka_consumer.datadog_agent import read_persistent_cache, write_persistent_cache

from .constants import BROKER_REQUESTS_BATCH_SIZE, KAFKA_INTERNAL_TOPICS
from six.moves import cPickle as pickle

MAX_TIMESTAMPS = 1000


class NewKafkaConsumerCheck(object):
"""
Check the offsets and lag of Kafka consumers. This check also returns broker highwater offsets.
Expand Down Expand Up @@ -102,7 +101,9 @@ def _load_broker_timestamps(self):
"""Loads broker timestamps from persistant cache."""
self._broker_timestamps = defaultdict(dict)
try:
self._broker_timestamps.update(pickle.loads(read_persistent_cache(self._broker_timestamp_cache_key))) # use update since defaultdict does not pickle
self._broker_timestamps.update(
pickle.loads(read_persistent_cache(self._broker_timestamp_cache_key))
) # use update since defaultdict does not pickle
except Exception:
self.log.warning('Could not read broker timestamps from cache')

Expand Down Expand Up @@ -280,9 +281,10 @@ def _report_consumer_offsets_and_lag(self, contexts_limit):
if reported_contexts >= contexts_limit:
continue
timestamps = self._broker_timestamps[(topic, partition)]
# producer_timestamp is set in the same check, so it should never be None
producer_timestamp = timestamps[producer_offset]
consumer_timestamp = self._get_interpolated_timestamp(timestamps, consumer_offset)
if consumer_timestamp is None:
if consumer_timestamp is None or producer_timestamp is None:
continue
lag = consumer_timestamp - producer_timestamp
self.gauge('consumer_lag_seconds', lag, tags=consumer_group_tags)
Expand All @@ -303,7 +305,7 @@ def _report_consumer_offsets_and_lag(self, contexts_limit):

def _get_interpolated_timestamp(self, timestamps, offset):
if offset in timestamps:
return timestamps[offset], True
return timestamps[offset]
offsets = timestamps.keys()
try:
# Get the most close saved offsets to the consumer_offset
Expand Down
6 changes: 4 additions & 2 deletions kafka_consumer/tests/test_kafka_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@
# Licensed under Simplified BSD License (see LICENSE)
import copy
import os
import pickle
from collections import defaultdict

import mock
import pytest
import pickle
from collections import defaultdict

from datadog_checks.kafka_consumer import KafkaCheck
from datadog_checks.kafka_consumer.legacy_0_10_2 import LegacyKafkaCheck_0_10_2
Expand All @@ -33,6 +33,7 @@ def mocked_read_persistent_cache(cache_key):
cached_offsets[("marvel", 0)][45] = 250
return pickle.dumps(cached_offsets)


@pytest.mark.unit
def test_uses_legacy_implementation_when_legacy_version_specified(kafka_instance):
instance = copy.deepcopy(kafka_instance)
Expand Down Expand Up @@ -67,6 +68,7 @@ def test_get_interpolated_timestamp(kafka_instance):
assert check.sub_check._get_interpolated_timestamp({0: 100, 10: 200}, 15) == 250
assert check.sub_check._get_interpolated_timestamp({10: 200}, 15) is None


@pytest.mark.unit
def test_gssapi(kafka_instance, dd_run_check):
instance = copy.deepcopy(kafka_instance)
Expand Down
8 changes: 4 additions & 4 deletions kafka_consumer/tox.ini
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
[tox]
isolated_build = true
minversion = 2.0
basepython = py39
basepython = py38
envlist =
py27-{0.9,latest}-{kafka,zk}
py39-{0.9,0.11,1.1,2.3,latest}-{kafka,zk}
py38-{0.9,0.11,1.1,2.3,latest}-{kafka,zk}

[testenv]
ensure_default_envdir = true
envdir =
py27: {toxworkdir}/py27
py39: {toxworkdir}/py39
py38: {toxworkdir}/py38
description =
py{27,39}-{0.9,latest}-{kafka,zk}: e2e ready
py{27,38}-{0.9,latest}-{kafka,zk}: e2e ready
dd_check_style = true
usedevelop = true
platform = linux|darwin|win32
Expand Down

0 comments on commit ddb0dd9

Please sign in to comment.