Skip to content

Commit

Permalink
fix lag in seconds metric
Browse files Browse the repository at this point in the history
  • Loading branch information
piochelepiotr committed May 4, 2022
1 parent aadab87 commit 5408f67
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 37 deletions.
22 changes: 11 additions & 11 deletions kafka_consumer/datadog_checks/kafka_consumer/new_kafka_consumer.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
# (C) Datadog, Inc. 2019-present
# All rights reserved
# Licensed under Simplified BSD License (see LICENSE)
import json
from collections import defaultdict
from time import time

from kafka import errors as kafka_errors
from kafka.protocol.offset import OffsetRequest, OffsetResetStrategy, OffsetResponse
from kafka.structs import TopicPartition
from six.moves import cPickle as pickle

from datadog_checks.base import AgentCheck, ConfigurationError
from datadog_checks.kafka_consumer.datadog_agent import read_persistent_cache, write_persistent_cache
Expand Down Expand Up @@ -98,17 +98,17 @@ def check(self):
self._collect_broker_metadata()

def _load_broker_timestamps(self):
"""Loads broker timestamps from persistant cache."""
"""Loads broker timestamps from persistent cache."""
self._broker_timestamps = defaultdict(dict)
try:
self._broker_timestamps.update(
pickle.loads(read_persistent_cache(self._broker_timestamp_cache_key))
) # use update since defaultdict does not pickle
except Exception:
self.log.warning('Could not read broker timestamps from cache')
for topic_partition, content in json.loads(read_persistent_cache(self._broker_timestamp_cache_key)).items():
for offset, timestamp in content.items():
self._broker_timestamps[topic_partition][int(offset)] = timestamp
except Exception as e:
self.log.warning('Could not read broker timestamps from cache' + str(e))

def _save_broker_timestamps(self):
write_persistent_cache(self._broker_timestamp_cache_key, pickle.dumps(self._broker_timestamps))
write_persistent_cache(self._broker_timestamp_cache_key, json.dumps(self._broker_timestamps))

def _create_kafka_admin_client(self, api_version):
"""Return a KafkaAdminClient."""
Expand Down Expand Up @@ -192,7 +192,7 @@ def _highwater_offsets_callback(self, response):
error_type = kafka_errors.for_code(error_code)
if error_type is kafka_errors.NoError:
self._highwater_offsets[(topic, partition)] = offsets[0]
timestamps = self._broker_timestamps[(topic, partition)]
timestamps = self._broker_timestamps["{}_{}".format(topic, partition)]
timestamps[offsets[0]] = time()
# If there's too many timestamps, we delete the oldest
if len(timestamps) > MAX_TIMESTAMPS:
Expand Down Expand Up @@ -280,13 +280,13 @@ def _report_consumer_offsets_and_lag(self, contexts_limit):

if reported_contexts >= contexts_limit:
continue
timestamps = self._broker_timestamps[(topic, partition)]
timestamps = self._broker_timestamps["{}_{}".format(topic, partition)]
# producer_timestamp is set in the same check, so it should never be None
producer_timestamp = timestamps[producer_offset]
consumer_timestamp = self._get_interpolated_timestamp(timestamps, consumer_offset)
if consumer_timestamp is None or producer_timestamp is None:
continue
lag = consumer_timestamp - producer_timestamp
lag = producer_timestamp - consumer_timestamp
self.gauge('consumer_lag_seconds', lag, tags=consumer_group_tags)
reported_contexts += 1
else:
Expand Down
58 changes: 32 additions & 26 deletions kafka_consumer/tests/test_kafka_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@
# All rights reserved
# Licensed under Simplified BSD License (see LICENSE)
import copy
import json
import os
import pickle
from collections import defaultdict

import mock
Expand All @@ -23,15 +23,20 @@
BROKER_METRICS = ['kafka.broker_offset']

CONSUMER_METRICS = ['kafka.consumer_offset', 'kafka.consumer_lag']
if not is_legacy_check():
CONSUMER_METRICS.append("kafka.consumer_lag_seconds")


def mocked_read_persistent_cache(cache_key):
cached_offsets = defaultdict(dict)
cached_offsets[("marvel", 0)][25] = 150
cached_offsets[("marvel", 0)][45] = 250
return pickle.dumps(cached_offsets)
cached_offsets["marvel_0"][25] = 150
cached_offsets["marvel_0"][40] = 200
return json.dumps(cached_offsets)


def mocked_time():
# broker offset 80 will be set to timestamp 400.
# knowing that from the cache, offset 40 is set at 200, and that the consumer is at offset 60,
# the timestamp of the consumer will be at 300. So time lag is 400-300=100seconds.
return 400


@pytest.mark.unit
Expand Down Expand Up @@ -127,16 +132,15 @@ def test_tls_config_legacy(extra_config, expected_http_kwargs, kafka_instance):

@pytest.mark.integration
@pytest.mark.usefixtures('dd_environment')
@mock.patch('datadog_checks.kafka_consumer.new_kafka_consumer.read_persistent_cache', mocked_read_persistent_cache)
@mock.patch('datadog_checks.kafka_consumer.new_kafka_consumer.time', mocked_time)
def test_check_kafka(aggregator, kafka_instance, dd_run_check):
"""
Testing Kafka_consumer check.
"""
with mock.patch(
'datadog_checks.kafka_consumer.new_kafka_consumer.read_persistent_cache', mocked_read_persistent_cache
):
kafka_consumer_check = KafkaCheck('kafka_consumer', {}, [kafka_instance])
dd_run_check(kafka_consumer_check)
assert_check_kafka(aggregator, kafka_instance['consumer_groups'])
kafka_consumer_check = KafkaCheck('kafka_consumer', {}, [kafka_instance])
dd_run_check(kafka_consumer_check)
assert_check_kafka(aggregator, kafka_instance['consumer_groups'])


@pytest.mark.integration
Expand Down Expand Up @@ -178,6 +182,10 @@ def assert_check_kafka(aggregator, consumer_groups):
aggregator.assert_metric(mname, tags=tags, at_least=1)
for mname in CONSUMER_METRICS:
aggregator.assert_metric(mname, tags=tags + ["consumer_group:{}".format(name)], at_least=1)
if not is_legacy_check():
aggregator.assert_metric(
"kafka.consumer_lag_seconds", tags=tags + ["consumer_group:{}".format(name)], count=1, value=100
)

aggregator.assert_all_metrics_covered()

Expand All @@ -196,28 +204,26 @@ def test_consumer_config_error(caplog, dd_run_check):
@pytest.mark.skipif(is_legacy_check(), reason="This test does not apply to the legacy check.")
@pytest.mark.integration
@pytest.mark.usefixtures('dd_environment')
@mock.patch('datadog_checks.kafka_consumer.new_kafka_consumer.read_persistent_cache', mocked_read_persistent_cache)
@mock.patch('datadog_checks.kafka_consumer.new_kafka_consumer.time', mocked_time)
def test_no_topics(aggregator, kafka_instance, dd_run_check):
with mock.patch(
'datadog_checks.kafka_consumer.new_kafka_consumer.read_persistent_cache', mocked_read_persistent_cache
):
kafka_instance['consumer_groups'] = {'my_consumer': {}}
kafka_consumer_check = KafkaCheck('kafka_consumer', {}, [kafka_instance])
dd_run_check(kafka_consumer_check)
kafka_instance['consumer_groups'] = {'my_consumer': {}}
kafka_consumer_check = KafkaCheck('kafka_consumer', {}, [kafka_instance])
dd_run_check(kafka_consumer_check)

assert_check_kafka(aggregator, {'my_consumer': {'marvel': [0]}})
assert_check_kafka(aggregator, {'my_consumer': {'marvel': [0]}})


@pytest.mark.integration
@pytest.mark.usefixtures('dd_environment')
@mock.patch('datadog_checks.kafka_consumer.new_kafka_consumer.read_persistent_cache', mocked_read_persistent_cache)
@mock.patch('datadog_checks.kafka_consumer.new_kafka_consumer.time', mocked_time)
def test_no_partitions(aggregator, kafka_instance, dd_run_check):
with mock.patch(
'datadog_checks.kafka_consumer.new_kafka_consumer.read_persistent_cache', mocked_read_persistent_cache
):
kafka_instance['consumer_groups'] = {'my_consumer': {'marvel': []}}
kafka_consumer_check = KafkaCheck('kafka_consumer', {}, [kafka_instance])
dd_run_check(kafka_consumer_check)
kafka_instance['consumer_groups'] = {'my_consumer': {'marvel': []}}
kafka_consumer_check = KafkaCheck('kafka_consumer', {}, [kafka_instance])
dd_run_check(kafka_consumer_check)

assert_check_kafka(aggregator, {'my_consumer': {'marvel': [0]}})
assert_check_kafka(aggregator, {'my_consumer': {'marvel': [0]}})


@pytest.mark.skipif(os.environ.get('KAFKA_VERSION', '').startswith('0.9'), reason='Old Kafka version')
Expand Down

0 comments on commit 5408f67

Please sign in to comment.