Skip to content

Commit

Permalink
kafka_consumer: Add lag new metric: lag in seconds
Browse files Browse the repository at this point in the history
  • Loading branch information
piochelepiotr committed Apr 20, 2022
1 parent d996317 commit 5047ca1
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 7 deletions.
80 changes: 78 additions & 2 deletions kafka_consumer/datadog_checks/kafka_consumer/new_kafka_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
# All rights reserved
# Licensed under Simplified BSD License (see LICENSE)
from collections import defaultdict
from time import time
import os

from kafka import errors as kafka_errors
from kafka.protocol.offset import OffsetRequest, OffsetResetStrategy, OffsetResponse
Expand All @@ -10,7 +12,10 @@
from datadog_checks.base import AgentCheck, ConfigurationError

from .constants import BROKER_REQUESTS_BATCH_SIZE, KAFKA_INTERNAL_TOPICS
from six.moves import cPickle as pickle

MAX_TIMESTAMPS = 1000
BROKER_TIMESTAMPS_FILENAME = '/tmp/broker_timestamps'

class NewKafkaConsumerCheck(object):
"""
Expand All @@ -23,6 +28,7 @@ def __init__(self, parent_check):
self._parent_check = parent_check
self._broker_requests_batch_size = self.instance.get('broker_requests_batch_size', BROKER_REQUESTS_BATCH_SIZE)
self._kafka_client = None
self._broker_timestamps_filename = BROKER_TIMESTAMPS_FILENAME + self.instance.get('kafka_connect_str')

def __getattr__(self, item):
try:
Expand Down Expand Up @@ -61,6 +67,7 @@ def check(self):
self.log.exception("There was a problem collecting consumer offsets from Kafka.")
# don't raise because we might get valid broker offsets

self._load_broker_timestamps()
# Fetch the broker highwater offsets
try:
if len(self._consumer_offsets) < self._context_limit:
Expand All @@ -82,12 +89,37 @@ def check(self):
self._context_limit,
)

self._save_broker_timestamps()

# Report the metrics
self._report_highwater_offsets(self._context_limit)
self._report_consumer_offsets_and_lag(self._context_limit - len(self._highwater_offsets))

self._collect_broker_metadata()

def _load_broker_timestamps(self):
"""Loads broker timestamps from disk."""
self._broker_timestamps = defaultdict(dict)
try:
with open(self._broker_timestamps_filename, 'rb') as f:
self._broker_timestamps.update(pickle.load(f)) # use update since defaultdict does not pickle
except Exception:
# file may be corrupted from agent restart during writing
# remove the file, it will be created again further down
self.log.warning('Could not read broker timestamps on disk. Removing file...')
try:
os.remove(self._broker_timestamps_filename)
except OSError:
self.log.exception('Error removing broker timestamps file %s', self._broker_timestamps_filename)
pass

def _save_broker_timestamps(self):
try:
with open(self._broker_timestamps_filename, 'wb') as f:
pickle.dump(self._broker_timestamps, f)
except Exception:
self.log.exception('Could not write the broker timestamps on the disk')

def _create_kafka_admin_client(self, api_version):
"""Return a KafkaAdminClient."""
# TODO accept None (which inherits kafka-python default of localhost:9092)
Expand Down Expand Up @@ -170,6 +202,11 @@ def _highwater_offsets_callback(self, response):
error_type = kafka_errors.for_code(error_code)
if error_type is kafka_errors.NoError:
self._highwater_offsets[(topic, partition)] = offsets[0]
timestamps = self._broker_timestamps[(topic, partition)]
timestamps[offsets[0]] = time()
# If there's too many timestamps, we delete the oldest
if len(timestamps) > MAX_TIMESTAMPS:
del timestamps[min(timestamps)]
elif error_type is kafka_errors.NotLeaderForPartitionError:
self.log.warning(
"Kafka broker returned %s (error_code %s) for topic %s, partition: %s. This should only happen "
Expand Down Expand Up @@ -233,8 +270,8 @@ def _report_consumer_offsets_and_lag(self, contexts_limit):
partition,
)
continue

consumer_lag = self._highwater_offsets[(topic, partition)] - consumer_offset
producer_offset = self._highwater_offsets[(topic, partition)]
consumer_lag = producer_offset - consumer_offset
if reported_contexts < contexts_limit:
self.gauge('consumer_lag', consumer_lag, tags=consumer_group_tags)
reported_contexts += 1
Expand All @@ -251,6 +288,16 @@ def _report_consumer_offsets_and_lag(self, contexts_limit):
self.send_event(title, message, consumer_group_tags, 'consumer_lag', key, severity="error")
self.log.debug(message)

if reported_contexts >= contexts_limit:
continue
timestamps = self._broker_timestamps[(topic, partition)]
producer_timestamp = timestamps[producer_offset]
consumer_timestamp = self._get_interpolated_timestamp(timestamps, consumer_offset)
if consumer_timestamp is None:
continue
lag = consumer_timestamp - producer_timestamp
self.gauge('consumer_lag_seconds', lag, tags=consumer_group_tags)
reported_contexts += 1
else:
if partitions is None:
msg = (
Expand All @@ -265,6 +312,35 @@ def _report_consumer_offsets_and_lag(self, contexts_limit):
self.log.warning(msg, consumer_group, topic, partition)
self.kafka_client._client.cluster.request_update() # force metadata update on next poll()

def _get_interpolated_timestamp(self, timestamps, offset):
if offset in timestamps:
return timestamps[offset], True
offsets = timestamps.keys()
try:
# Get the most close saved offsets to the consumer_offset
offset_before = max([o for o in offsets if o < offset])
offset_after = min([o for o in offsets if o > offset])
except ValueError:
if len(offsets) < 2:
self.log.debug("Can't compute the timestamp as we don't have enough offsets history yet")
return None
# We couldn't find offsets before and after the current consumer offset.
# This happens when you start a consumer to replay data in the past:
# - We provision a consumer at t0 that will start consuming from t1 (t1 << t0).
# - It starts building a history of offset/timestamp pairs from the moment it started to run, i.e. t0.
# - So there is no offset/timestamp pair in the local history between t1 -> t0.
# We'll take the min and max offsets available and assume the timestamp is an affine function
# of the offset to compute an approximate broker timestamp corresponding to the current consumer offset.
offset_before = min(offsets)
offset_after = max(offsets)

# We assume that the timestamp is an affine function of the offset
timestamp_before = timestamps[offset_before]
timestamp_after = timestamps[offset_after]
slope = (timestamp_after - timestamp_before) / float(offset_after - offset_before)
timestamp = slope * (offset - offset_after) + timestamp_after
return timestamp

def _get_consumer_offsets(self):
"""Fetch Consumer Group offsets from Kafka.
Expand Down
2 changes: 1 addition & 1 deletion kafka_consumer/tests/test_kafka_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

BROKER_METRICS = ['kafka.broker_offset']

CONSUMER_METRICS = ['kafka.consumer_offset', 'kafka.consumer_lag']
CONSUMER_METRICS = ['kafka.consumer_offset', 'kafka.consumer_lag', "kafka.consumer_lag_seconds"]


@pytest.mark.unit
Expand Down
8 changes: 4 additions & 4 deletions kafka_consumer/tox.ini
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
[tox]
isolated_build = true
minversion = 2.0
basepython = py38
basepython = py39
envlist =
py27-{0.9,latest}-{kafka,zk}
py38-{0.9,0.11,1.1,2.3,latest}-{kafka,zk}
py39-{0.9,0.11,1.1,2.3,latest}-{kafka,zk}

[testenv]
ensure_default_envdir = true
envdir =
py27: {toxworkdir}/py27
py38: {toxworkdir}/py38
py39: {toxworkdir}/py39
description =
py{27,38}-{0.9,latest}-{kafka,zk}: e2e ready
py{27,39}-{0.9,latest}-{kafka,zk}: e2e ready
dd_check_style = true
usedevelop = true
platform = linux|darwin|win32
Expand Down

0 comments on commit 5047ca1

Please sign in to comment.