Skip to content

Commit

Permalink
kafka_consumer: Add kafka.consumer_lag_seconds metric
Browse files Browse the repository at this point in the history
  • Loading branch information
piochelepiotr committed May 3, 2022
1 parent 5047ca1 commit 0bf6c28
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 20 deletions.
17 changes: 17 additions & 0 deletions kafka_consumer/datadog_checks/kafka_consumer/datadog_agent.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# (C) Datadog, Inc. 2020-present
# All rights reserved
# Licensed under Simplified BSD License (see LICENSE)

from typing import Optional

try:
from datadog_agent import read_persistent_cache, write_persistent_cache
except ImportError:

def write_persistent_cache(key, value):
# type: (str, str) -> None
pass

def read_persistent_cache(key):
# type: (str) -> Optional[str]
return ''
25 changes: 7 additions & 18 deletions kafka_consumer/datadog_checks/kafka_consumer/new_kafka_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,12 @@

from datadog_checks.base import AgentCheck, ConfigurationError

from datadog_checks.kafka_consumer.datadog_agent import read_persistent_cache, write_persistent_cache

from .constants import BROKER_REQUESTS_BATCH_SIZE, KAFKA_INTERNAL_TOPICS
from six.moves import cPickle as pickle

MAX_TIMESTAMPS = 1000
BROKER_TIMESTAMPS_FILENAME = '/tmp/broker_timestamps'

class NewKafkaConsumerCheck(object):
"""
Expand All @@ -28,7 +29,7 @@ def __init__(self, parent_check):
self._parent_check = parent_check
self._broker_requests_batch_size = self.instance.get('broker_requests_batch_size', BROKER_REQUESTS_BATCH_SIZE)
self._kafka_client = None
self._broker_timestamps_filename = BROKER_TIMESTAMPS_FILENAME + self.instance.get('kafka_connect_str')
self._broker_timestamp_cache_key = 'broker_timestamps' + self.instance.get('kafka_connect_str', "")

def __getattr__(self, item):
try:
Expand Down Expand Up @@ -98,27 +99,15 @@ def check(self):
self._collect_broker_metadata()

def _load_broker_timestamps(self):
"""Loads broker timestamps from disk."""
"""Loads broker timestamps from persistant cache."""
self._broker_timestamps = defaultdict(dict)
try:
with open(self._broker_timestamps_filename, 'rb') as f:
self._broker_timestamps.update(pickle.load(f)) # use update since defaultdict does not pickle
self._broker_timestamps.update(pickle.loads(read_persistent_cache(self._broker_timestamp_cache_key))) # use update since defaultdict does not pickle
except Exception:
# file may be corrupted from agent restart during writing
# remove the file, it will be created again further down
self.log.warning('Could not read broker timestamps on disk. Removing file...')
try:
os.remove(self._broker_timestamps_filename)
except OSError:
self.log.exception('Error removing broker timestamps file %s', self._broker_timestamps_filename)
pass
self.log.warning('Could not read broker timestamps from cache')

def _save_broker_timestamps(self):
try:
with open(self._broker_timestamps_filename, 'wb') as f:
pickle.dump(self._broker_timestamps, f)
except Exception:
self.log.exception('Could not write the broker timestamps on the disk')
write_persistent_cache(self._broker_timestamp_cache_key, pickle.dumps(self._broker_timestamps))

def _create_kafka_admin_client(self, api_version):
"""Return a KafkaAdminClient."""
Expand Down
30 changes: 28 additions & 2 deletions kafka_consumer/tests/test_kafka_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@

import mock
import pytest
import pickle
from collections import defaultdict

from datadog_checks.kafka_consumer import KafkaCheck
from datadog_checks.kafka_consumer.legacy_0_10_2 import LegacyKafkaCheck_0_10_2
Expand All @@ -20,9 +22,17 @@

BROKER_METRICS = ['kafka.broker_offset']

CONSUMER_METRICS = ['kafka.consumer_offset', 'kafka.consumer_lag', "kafka.consumer_lag_seconds"]
CONSUMER_METRICS = ['kafka.consumer_offset', 'kafka.consumer_lag']
if not is_legacy_check():
CONSUMER_METRICS.append("kafka.consumer_lag_seconds")


def mocked_read_persistent_cache(cache_key):
cached_offsets = defaultdict(dict)
cached_offsets[("marvel", 0)][25] = 150
cached_offsets[("marvel", 0)][45] = 250
return pickle.dumps(cached_offsets)

@pytest.mark.unit
def test_uses_legacy_implementation_when_legacy_version_specified(kafka_instance):
instance = copy.deepcopy(kafka_instance)
Expand All @@ -43,6 +53,20 @@ def test_uses_new_implementation_when_new_version_specified(kafka_instance):
assert isinstance(kafka_consumer_check.sub_check, NewKafkaConsumerCheck)


@pytest.mark.unit
def test_get_interpolated_timestamp(kafka_instance):
instance = copy.deepcopy(kafka_instance)
instance['kafka_client_api_version'] = '0.10.2'
instance['sasl_kerberos_service_name'] = 'kafka'
check = KafkaCheck('kafka_consumer', {}, [instance])
check._init_check_based_on_kafka_version()
# at offset 0, time is 100s, at offset 10, time is 200sec.
# by interpolation, at offset 5, time should be 150sec.
assert check.sub_check._get_interpolated_timestamp({0: 100, 10: 200}, 5) == 150
assert check.sub_check._get_interpolated_timestamp({10: 100, 20: 200}, 5) == 50
assert check.sub_check._get_interpolated_timestamp({0: 100, 10: 200}, 15) == 250
assert check.sub_check._get_interpolated_timestamp({10: 200}, 15) is None

@pytest.mark.unit
def test_gssapi(kafka_instance, dd_run_check):
instance = copy.deepcopy(kafka_instance)
Expand Down Expand Up @@ -101,13 +125,13 @@ def test_tls_config_legacy(extra_config, expected_http_kwargs, kafka_instance):

@pytest.mark.integration
@pytest.mark.usefixtures('dd_environment')
@mock.patch('datadog_checks.kafka_consumer.new_kafka_consumer.read_persistent_cache', mocked_read_persistent_cache)
def test_check_kafka(aggregator, kafka_instance, dd_run_check):
"""
Testing Kafka_consumer check.
"""
kafka_consumer_check = KafkaCheck('kafka_consumer', {}, [kafka_instance])
dd_run_check(kafka_consumer_check)

assert_check_kafka(aggregator, kafka_instance['consumer_groups'])


Expand Down Expand Up @@ -168,6 +192,7 @@ def test_consumer_config_error(caplog, dd_run_check):
@pytest.mark.skipif(is_legacy_check(), reason="This test does not apply to the legacy check.")
@pytest.mark.integration
@pytest.mark.usefixtures('dd_environment')
@mock.patch('datadog_checks.kafka_consumer.new_kafka_consumer.read_persistent_cache', mocked_read_persistent_cache)
def test_no_topics(aggregator, kafka_instance, dd_run_check):
kafka_instance['consumer_groups'] = {'my_consumer': {}}
kafka_consumer_check = KafkaCheck('kafka_consumer', {}, [kafka_instance])
Expand All @@ -178,6 +203,7 @@ def test_no_topics(aggregator, kafka_instance, dd_run_check):

@pytest.mark.integration
@pytest.mark.usefixtures('dd_environment')
@mock.patch('datadog_checks.kafka_consumer.new_kafka_consumer.read_persistent_cache', mocked_read_persistent_cache)
def test_no_partitions(aggregator, kafka_instance, dd_run_check):
kafka_instance['consumer_groups'] = {'my_consumer': {'marvel': []}}
kafka_consumer_check = KafkaCheck('kafka_consumer', {}, [kafka_instance])
Expand Down

0 comments on commit 0bf6c28

Please sign in to comment.