Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add new lag in seconds metric #11861

Merged
merged 14 commits into from
May 10, 2022
17 changes: 17 additions & 0 deletions kafka_consumer/datadog_checks/kafka_consumer/datadog_agent.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# (C) Datadog, Inc. 2020-present
# All rights reserved
# Licensed under Simplified BSD License (see LICENSE)

from typing import Optional

try:
from datadog_agent import read_persistent_cache, write_persistent_cache
except ImportError:

def write_persistent_cache(key, value):
# type: (str, str) -> None
pass

def read_persistent_cache(key):
# type: (str) -> Optional[str]
return ''
Comment on lines +7 to +17
Copy link
Contributor

@hithwen hithwen May 9, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this? The feature has been in the agent for years, the minimun supported base check already has it

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good. I copied it from another check but you have more context than me on it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you think it's safe to switch to directly import datadog_agent, I'm happy to change it.

Original file line number Diff line number Diff line change
@@ -1,16 +1,21 @@
# (C) Datadog, Inc. 2019-present
# All rights reserved
# Licensed under Simplified BSD License (see LICENSE)
import json
from collections import defaultdict
from time import time

from kafka import errors as kafka_errors
from kafka.protocol.offset import OffsetRequest, OffsetResetStrategy, OffsetResponse
from kafka.structs import TopicPartition

from datadog_checks.base import AgentCheck, ConfigurationError
from datadog_checks.kafka_consumer.datadog_agent import read_persistent_cache, write_persistent_cache

from .constants import BROKER_REQUESTS_BATCH_SIZE, KAFKA_INTERNAL_TOPICS

MAX_TIMESTAMPS = 1000


class NewKafkaConsumerCheck(object):
"""
Expand All @@ -23,6 +28,7 @@ def __init__(self, parent_check):
self._parent_check = parent_check
self._broker_requests_batch_size = self.instance.get('broker_requests_batch_size', BROKER_REQUESTS_BATCH_SIZE)
self._kafka_client = None
self._broker_timestamp_cache_key = 'broker_timestamps' + "".join(sorted(self._custom_tags))

def __getattr__(self, item):
try:
Expand Down Expand Up @@ -61,6 +67,7 @@ def check(self):
self.log.exception("There was a problem collecting consumer offsets from Kafka.")
# don't raise because we might get valid broker offsets

self._load_broker_timestamps()
# Fetch the broker highwater offsets
try:
if len(self._consumer_offsets) < self._context_limit:
Expand All @@ -82,12 +89,27 @@ def check(self):
self._context_limit,
)

self._save_broker_timestamps()

# Report the metrics
self._report_highwater_offsets(self._context_limit)
self._report_consumer_offsets_and_lag(self._context_limit - len(self._highwater_offsets))

self._collect_broker_metadata()

def _load_broker_timestamps(self):
"""Loads broker timestamps from persistent cache."""
self._broker_timestamps = defaultdict(dict)
try:
for topic_partition, content in json.loads(read_persistent_cache(self._broker_timestamp_cache_key)).items():
for offset, timestamp in content.items():
self._broker_timestamps[topic_partition][int(offset)] = timestamp
except Exception:
self.log.warning('Could not read broker timestamps from cache')
hithwen marked this conversation as resolved.
Show resolved Hide resolved

def _save_broker_timestamps(self):
write_persistent_cache(self._broker_timestamp_cache_key, json.dumps(self._broker_timestamps))

def _create_kafka_admin_client(self, api_version):
"""Return a KafkaAdminClient."""
# TODO accept None (which inherits kafka-python default of localhost:9092)
Expand Down Expand Up @@ -170,6 +192,11 @@ def _highwater_offsets_callback(self, response):
error_type = kafka_errors.for_code(error_code)
if error_type is kafka_errors.NoError:
self._highwater_offsets[(topic, partition)] = offsets[0]
timestamps = self._broker_timestamps["{}_{}".format(topic, partition)]
timestamps[offsets[0]] = time()
# If there's too many timestamps, we delete the oldest
if len(timestamps) > MAX_TIMESTAMPS:
del timestamps[min(timestamps)]
elif error_type is kafka_errors.NotLeaderForPartitionError:
self.log.warning(
"Kafka broker returned %s (error_code %s) for topic %s, partition: %s. This should only happen "
Expand Down Expand Up @@ -233,8 +260,8 @@ def _report_consumer_offsets_and_lag(self, contexts_limit):
partition,
)
continue

consumer_lag = self._highwater_offsets[(topic, partition)] - consumer_offset
producer_offset = self._highwater_offsets[(topic, partition)]
consumer_lag = producer_offset - consumer_offset
if reported_contexts < contexts_limit:
self.gauge('consumer_lag', consumer_lag, tags=consumer_group_tags)
reported_contexts += 1
Expand All @@ -251,6 +278,17 @@ def _report_consumer_offsets_and_lag(self, contexts_limit):
self.send_event(title, message, consumer_group_tags, 'consumer_lag', key, severity="error")
self.log.debug(message)

if reported_contexts >= contexts_limit:
continue
timestamps = self._broker_timestamps["{}_{}".format(topic, partition)]
# producer_timestamp is set in the same check, so it should never be None
producer_timestamp = timestamps[producer_offset]
consumer_timestamp = self._get_interpolated_timestamp(timestamps, consumer_offset)
if consumer_timestamp is None or producer_timestamp is None:
continue
lag = producer_timestamp - consumer_timestamp
self.gauge('consumer_lag_seconds', lag, tags=consumer_group_tags)
reported_contexts += 1
else:
if partitions is None:
msg = (
Expand All @@ -265,6 +303,35 @@ def _report_consumer_offsets_and_lag(self, contexts_limit):
self.log.warning(msg, consumer_group, topic, partition)
self.kafka_client._client.cluster.request_update() # force metadata update on next poll()

def _get_interpolated_timestamp(self, timestamps, offset):
if offset in timestamps:
return timestamps[offset]
offsets = timestamps.keys()
try:
# Get the most close saved offsets to the consumer_offset
offset_before = max([o for o in offsets if o < offset])
offset_after = min([o for o in offsets if o > offset])
except ValueError:
if len(offsets) < 2:
self.log.debug("Can't compute the timestamp as we don't have enough offsets history yet")
return None
# We couldn't find offsets before and after the current consumer offset.
# This happens when you start a consumer to replay data in the past:
# - We provision a consumer at t0 that will start consuming from t1 (t1 << t0).
# - It starts building a history of offset/timestamp pairs from the moment it started to run, i.e. t0.
# - So there is no offset/timestamp pair in the local history between t1 -> t0.
# We'll take the min and max offsets available and assume the timestamp is an affine function
# of the offset to compute an approximate broker timestamp corresponding to the current consumer offset.
offset_before = min(offsets)
offset_after = max(offsets)

# We assume that the timestamp is an affine function of the offset
timestamp_before = timestamps[offset_before]
timestamp_after = timestamps[offset_after]
slope = (timestamp_after - timestamp_before) / float(offset_after - offset_before)
timestamp = slope * (offset - offset_after) + timestamp_after
return timestamp

def _get_consumer_offsets(self):
"""Fetch Consumer Group offsets from Kafka.

Expand Down
1 change: 1 addition & 0 deletions kafka_consumer/metadata.csv
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@ metric_name,metric_type,interval,unit_name,per_unit_name,description,orientation
kafka.broker_offset,gauge,,offset,,Current message offset on broker.,0,kafka,broker offset,
kafka.consumer_lag,gauge,,offset,,Lag in messages between consumer and broker.,-1,kafka,consumer lag,
kafka.consumer_offset,gauge,,offset,,Current message offset on consumer.,0,kafka,consumer offset,
kafka.consumer_lag_seconds,gauge,,second,,Lag in seconds between consumer and broker.,-1,kafka,consumer time lag,
47 changes: 43 additions & 4 deletions kafka_consumer/tests/test_kafka_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@
# All rights reserved
# Licensed under Simplified BSD License (see LICENSE)
import copy
import json
import os
from collections import defaultdict

import mock
import pytest
Expand All @@ -23,6 +25,17 @@
CONSUMER_METRICS = ['kafka.consumer_offset', 'kafka.consumer_lag']


def mocked_read_persistent_cache(cache_key):
cached_offsets = defaultdict(dict)
cached_offsets["marvel_0"][25] = 150
cached_offsets["marvel_0"][40] = 200
return json.dumps(cached_offsets)


def mocked_time():
return 400


@pytest.mark.unit
def test_uses_legacy_implementation_when_legacy_version_specified(kafka_instance):
instance = copy.deepcopy(kafka_instance)
Expand All @@ -43,6 +56,21 @@ def test_uses_new_implementation_when_new_version_specified(kafka_instance):
assert isinstance(kafka_consumer_check.sub_check, NewKafkaConsumerCheck)


@pytest.mark.unit
def test_get_interpolated_timestamp(kafka_instance):
instance = copy.deepcopy(kafka_instance)
instance['kafka_client_api_version'] = '0.10.2'
instance['sasl_kerberos_service_name'] = 'kafka'
check = KafkaCheck('kafka_consumer', {}, [instance])
check._init_check_based_on_kafka_version()
# at offset 0, time is 100s, at offset 10, time is 200sec.
# by interpolation, at offset 5, time should be 150sec.
assert check.sub_check._get_interpolated_timestamp({0: 100, 10: 200}, 5) == 150
assert check.sub_check._get_interpolated_timestamp({10: 100, 20: 200}, 5) == 50
assert check.sub_check._get_interpolated_timestamp({0: 100, 10: 200}, 15) == 250
assert check.sub_check._get_interpolated_timestamp({10: 200}, 15) is None


@pytest.mark.unit
def test_gssapi(kafka_instance, dd_run_check):
instance = copy.deepcopy(kafka_instance)
Expand Down Expand Up @@ -101,13 +129,14 @@ def test_tls_config_legacy(extra_config, expected_http_kwargs, kafka_instance):

@pytest.mark.integration
@pytest.mark.usefixtures('dd_environment')
@mock.patch('datadog_checks.kafka_consumer.new_kafka_consumer.read_persistent_cache', mocked_read_persistent_cache)
@mock.patch('datadog_checks.kafka_consumer.new_kafka_consumer.time', mocked_time)
def test_check_kafka(aggregator, kafka_instance, dd_run_check):
"""
Testing Kafka_consumer check.
"""
kafka_consumer_check = KafkaCheck('kafka_consumer', {}, [kafka_instance])
dd_run_check(kafka_consumer_check)

assert_check_kafka(aggregator, kafka_instance['consumer_groups'])


Expand Down Expand Up @@ -137,11 +166,10 @@ def test_check_kafka_metrics_limit(aggregator, kafka_instance, dd_run_check):
@pytest.mark.e2e
def test_e2e(dd_agent_check, kafka_instance):
aggregator = dd_agent_check(kafka_instance)

assert_check_kafka(aggregator, kafka_instance['consumer_groups'])
assert_check_kafka(aggregator, kafka_instance['consumer_groups'], e2e=True)


def assert_check_kafka(aggregator, consumer_groups):
def assert_check_kafka(aggregator, consumer_groups, e2e=False):
for name, consumer_group in consumer_groups.items():
for topic, partitions in consumer_group.items():
for partition in partitions:
Expand All @@ -150,6 +178,13 @@ def assert_check_kafka(aggregator, consumer_groups):
aggregator.assert_metric(mname, tags=tags, at_least=1)
for mname in CONSUMER_METRICS:
aggregator.assert_metric(mname, tags=tags + ["consumer_group:{}".format(name)], at_least=1)
if not is_legacy_check() and not e2e:
# in the e2e test, Kafka is not actively receiving data. So we never populate the broker
# timestamps with more than one timestamp. So we can't interpolate to get the consumer
# timestamp.
aggregator.assert_metric(
"kafka.consumer_lag_seconds", tags=tags + ["consumer_group:{}".format(name)], at_least=1
)

aggregator.assert_all_metrics_covered()

Expand All @@ -168,6 +203,8 @@ def test_consumer_config_error(caplog, dd_run_check):
@pytest.mark.skipif(is_legacy_check(), reason="This test does not apply to the legacy check.")
@pytest.mark.integration
@pytest.mark.usefixtures('dd_environment')
@mock.patch('datadog_checks.kafka_consumer.new_kafka_consumer.read_persistent_cache', mocked_read_persistent_cache)
@mock.patch('datadog_checks.kafka_consumer.new_kafka_consumer.time', mocked_time)
def test_no_topics(aggregator, kafka_instance, dd_run_check):
kafka_instance['consumer_groups'] = {'my_consumer': {}}
kafka_consumer_check = KafkaCheck('kafka_consumer', {}, [kafka_instance])
Expand All @@ -178,6 +215,8 @@ def test_no_topics(aggregator, kafka_instance, dd_run_check):

@pytest.mark.integration
@pytest.mark.usefixtures('dd_environment')
@mock.patch('datadog_checks.kafka_consumer.new_kafka_consumer.read_persistent_cache', mocked_read_persistent_cache)
@mock.patch('datadog_checks.kafka_consumer.new_kafka_consumer.time', mocked_time)
def test_no_partitions(aggregator, kafka_instance, dd_run_check):
kafka_instance['consumer_groups'] = {'my_consumer': {'marvel': []}}
kafka_consumer_check = KafkaCheck('kafka_consumer', {}, [kafka_instance])
Expand Down