diff --git a/kafka_consumer/datadog_checks/kafka_consumer/datadog_agent.py b/kafka_consumer/datadog_checks/kafka_consumer/datadog_agent.py new file mode 100644 index 0000000000000..1548d309c38c6 --- /dev/null +++ b/kafka_consumer/datadog_checks/kafka_consumer/datadog_agent.py @@ -0,0 +1,17 @@ +# (C) Datadog, Inc. 2020-present +# All rights reserved +# Licensed under Simplified BSD License (see LICENSE) + +from typing import Optional + +try: + from datadog_agent import read_persistent_cache, write_persistent_cache +except ImportError: + + def write_persistent_cache(key, value): + # type: (str, str) -> None + pass + + def read_persistent_cache(key): + # type: (str) -> Optional[str] + return '' diff --git a/kafka_consumer/datadog_checks/kafka_consumer/new_kafka_consumer.py b/kafka_consumer/datadog_checks/kafka_consumer/new_kafka_consumer.py index aa7ce93b1a283..e1ea4ecf8b320 100644 --- a/kafka_consumer/datadog_checks/kafka_consumer/new_kafka_consumer.py +++ b/kafka_consumer/datadog_checks/kafka_consumer/new_kafka_consumer.py @@ -1,16 +1,21 @@ # (C) Datadog, Inc. 2019-present # All rights reserved # Licensed under Simplified BSD License (see LICENSE) +import json from collections import defaultdict +from time import time from kafka import errors as kafka_errors from kafka.protocol.offset import OffsetRequest, OffsetResetStrategy, OffsetResponse from kafka.structs import TopicPartition from datadog_checks.base import AgentCheck, ConfigurationError +from datadog_checks.kafka_consumer.datadog_agent import read_persistent_cache, write_persistent_cache from .constants import BROKER_REQUESTS_BATCH_SIZE, KAFKA_INTERNAL_TOPICS +MAX_TIMESTAMPS = 1000 + class NewKafkaConsumerCheck(object): """ @@ -23,6 +28,7 @@ def __init__(self, parent_check): self._parent_check = parent_check self._broker_requests_batch_size = self.instance.get('broker_requests_batch_size', BROKER_REQUESTS_BATCH_SIZE) self._kafka_client = None + self._broker_timestamp_cache_key = 'broker_timestamps' + "".join(sorted(self._custom_tags)) def __getattr__(self, item): try: @@ -61,6 +67,7 @@ def check(self): self.log.exception("There was a problem collecting consumer offsets from Kafka.") # don't raise because we might get valid broker offsets + self._load_broker_timestamps() # Fetch the broker highwater offsets try: if len(self._consumer_offsets) < self._context_limit: @@ -82,12 +89,27 @@ def check(self): self._context_limit, ) + self._save_broker_timestamps() + # Report the metrics self._report_highwater_offsets(self._context_limit) self._report_consumer_offsets_and_lag(self._context_limit - len(self._highwater_offsets)) self._collect_broker_metadata() + def _load_broker_timestamps(self): + """Loads broker timestamps from persistent cache.""" + self._broker_timestamps = defaultdict(dict) + try: + for topic_partition, content in json.loads(read_persistent_cache(self._broker_timestamp_cache_key)).items(): + for offset, timestamp in content.items(): + self._broker_timestamps[topic_partition][int(offset)] = timestamp + except Exception as e: + self.log.warning('Could not read broker timestamps from cache: %s', str(e)) + + def _save_broker_timestamps(self): + write_persistent_cache(self._broker_timestamp_cache_key, json.dumps(self._broker_timestamps)) + def _create_kafka_admin_client(self, api_version): """Return a KafkaAdminClient.""" # TODO accept None (which inherits kafka-python default of localhost:9092) @@ -170,6 +192,11 @@ def _highwater_offsets_callback(self, response): error_type = kafka_errors.for_code(error_code) if error_type is kafka_errors.NoError: self._highwater_offsets[(topic, partition)] = offsets[0] + timestamps = self._broker_timestamps["{}_{}".format(topic, partition)] + timestamps[offsets[0]] = time() + # If there's too many timestamps, we delete the oldest + if len(timestamps) > MAX_TIMESTAMPS: + del timestamps[min(timestamps)] elif error_type is kafka_errors.NotLeaderForPartitionError: self.log.warning( "Kafka broker returned %s (error_code %s) for topic %s, partition: %s. This should only happen " @@ -233,8 +260,8 @@ def _report_consumer_offsets_and_lag(self, contexts_limit): partition, ) continue - - consumer_lag = self._highwater_offsets[(topic, partition)] - consumer_offset + producer_offset = self._highwater_offsets[(topic, partition)] + consumer_lag = producer_offset - consumer_offset if reported_contexts < contexts_limit: self.gauge('consumer_lag', consumer_lag, tags=consumer_group_tags) reported_contexts += 1 @@ -251,6 +278,17 @@ def _report_consumer_offsets_and_lag(self, contexts_limit): self.send_event(title, message, consumer_group_tags, 'consumer_lag', key, severity="error") self.log.debug(message) + if reported_contexts >= contexts_limit: + continue + timestamps = self._broker_timestamps["{}_{}".format(topic, partition)] + # producer_timestamp is set in the same check, so it should never be None + producer_timestamp = timestamps[producer_offset] + consumer_timestamp = self._get_interpolated_timestamp(timestamps, consumer_offset) + if consumer_timestamp is None or producer_timestamp is None: + continue + lag = producer_timestamp - consumer_timestamp + self.gauge('consumer_lag_seconds', lag, tags=consumer_group_tags) + reported_contexts += 1 else: if partitions is None: msg = ( @@ -265,6 +303,35 @@ def _report_consumer_offsets_and_lag(self, contexts_limit): self.log.warning(msg, consumer_group, topic, partition) self.kafka_client._client.cluster.request_update() # force metadata update on next poll() + def _get_interpolated_timestamp(self, timestamps, offset): + if offset in timestamps: + return timestamps[offset] + offsets = timestamps.keys() + try: + # Get the most close saved offsets to the consumer_offset + offset_before = max([o for o in offsets if o < offset]) + offset_after = min([o for o in offsets if o > offset]) + except ValueError: + if len(offsets) < 2: + self.log.debug("Can't compute the timestamp as we don't have enough offsets history yet") + return None + # We couldn't find offsets before and after the current consumer offset. + # This happens when you start a consumer to replay data in the past: + # - We provision a consumer at t0 that will start consuming from t1 (t1 << t0). + # - It starts building a history of offset/timestamp pairs from the moment it started to run, i.e. t0. + # - So there is no offset/timestamp pair in the local history between t1 -> t0. + # We'll take the min and max offsets available and assume the timestamp is an affine function + # of the offset to compute an approximate broker timestamp corresponding to the current consumer offset. + offset_before = min(offsets) + offset_after = max(offsets) + + # We assume that the timestamp is an affine function of the offset + timestamp_before = timestamps[offset_before] + timestamp_after = timestamps[offset_after] + slope = (timestamp_after - timestamp_before) / float(offset_after - offset_before) + timestamp = slope * (offset - offset_after) + timestamp_after + return timestamp + def _get_consumer_offsets(self): """Fetch Consumer Group offsets from Kafka. diff --git a/kafka_consumer/metadata.csv b/kafka_consumer/metadata.csv index 11ec6aec16bbb..2461c74b314cb 100644 --- a/kafka_consumer/metadata.csv +++ b/kafka_consumer/metadata.csv @@ -2,3 +2,4 @@ metric_name,metric_type,interval,unit_name,per_unit_name,description,orientation kafka.broker_offset,gauge,,offset,,Current message offset on broker.,0,kafka,broker offset, kafka.consumer_lag,gauge,,offset,,Lag in messages between consumer and broker.,-1,kafka,consumer lag, kafka.consumer_offset,gauge,,offset,,Current message offset on consumer.,0,kafka,consumer offset, +kafka.consumer_lag_seconds,gauge,,second,,Lag in seconds between consumer and broker.,-1,kafka,consumer time lag, diff --git a/kafka_consumer/tests/test_kafka_consumer.py b/kafka_consumer/tests/test_kafka_consumer.py index 075322494d42c..b2f87706a9cf0 100644 --- a/kafka_consumer/tests/test_kafka_consumer.py +++ b/kafka_consumer/tests/test_kafka_consumer.py @@ -2,7 +2,9 @@ # All rights reserved # Licensed under Simplified BSD License (see LICENSE) import copy +import json import os +from collections import defaultdict import mock import pytest @@ -23,6 +25,17 @@ CONSUMER_METRICS = ['kafka.consumer_offset', 'kafka.consumer_lag'] +def mocked_read_persistent_cache(cache_key): + cached_offsets = defaultdict(dict) + cached_offsets["marvel_0"][25] = 150 + cached_offsets["marvel_0"][40] = 200 + return json.dumps(cached_offsets) + + +def mocked_time(): + return 400 + + @pytest.mark.unit def test_uses_legacy_implementation_when_legacy_version_specified(kafka_instance): instance = copy.deepcopy(kafka_instance) @@ -43,6 +56,21 @@ def test_uses_new_implementation_when_new_version_specified(kafka_instance): assert isinstance(kafka_consumer_check.sub_check, NewKafkaConsumerCheck) +@pytest.mark.unit +def test_get_interpolated_timestamp(kafka_instance): + instance = copy.deepcopy(kafka_instance) + instance['kafka_client_api_version'] = '0.10.2' + instance['sasl_kerberos_service_name'] = 'kafka' + check = KafkaCheck('kafka_consumer', {}, [instance]) + check._init_check_based_on_kafka_version() + # at offset 0, time is 100s, at offset 10, time is 200sec. + # by interpolation, at offset 5, time should be 150sec. + assert check.sub_check._get_interpolated_timestamp({0: 100, 10: 200}, 5) == 150 + assert check.sub_check._get_interpolated_timestamp({10: 100, 20: 200}, 5) == 50 + assert check.sub_check._get_interpolated_timestamp({0: 100, 10: 200}, 15) == 250 + assert check.sub_check._get_interpolated_timestamp({10: 200}, 15) is None + + @pytest.mark.unit def test_gssapi(kafka_instance, dd_run_check): instance = copy.deepcopy(kafka_instance) @@ -101,13 +129,14 @@ def test_tls_config_legacy(extra_config, expected_http_kwargs, kafka_instance): @pytest.mark.integration @pytest.mark.usefixtures('dd_environment') +@mock.patch('datadog_checks.kafka_consumer.new_kafka_consumer.read_persistent_cache', mocked_read_persistent_cache) +@mock.patch('datadog_checks.kafka_consumer.new_kafka_consumer.time', mocked_time) def test_check_kafka(aggregator, kafka_instance, dd_run_check): """ Testing Kafka_consumer check. """ kafka_consumer_check = KafkaCheck('kafka_consumer', {}, [kafka_instance]) dd_run_check(kafka_consumer_check) - assert_check_kafka(aggregator, kafka_instance['consumer_groups']) @@ -137,11 +166,10 @@ def test_check_kafka_metrics_limit(aggregator, kafka_instance, dd_run_check): @pytest.mark.e2e def test_e2e(dd_agent_check, kafka_instance): aggregator = dd_agent_check(kafka_instance) - - assert_check_kafka(aggregator, kafka_instance['consumer_groups']) + assert_check_kafka(aggregator, kafka_instance['consumer_groups'], e2e=True) -def assert_check_kafka(aggregator, consumer_groups): +def assert_check_kafka(aggregator, consumer_groups, e2e=False): for name, consumer_group in consumer_groups.items(): for topic, partitions in consumer_group.items(): for partition in partitions: @@ -150,6 +178,13 @@ def assert_check_kafka(aggregator, consumer_groups): aggregator.assert_metric(mname, tags=tags, at_least=1) for mname in CONSUMER_METRICS: aggregator.assert_metric(mname, tags=tags + ["consumer_group:{}".format(name)], at_least=1) + if not is_legacy_check() and not e2e: + # in the e2e test, Kafka is not actively receiving data. So we never populate the broker + # timestamps with more than one timestamp. So we can't interpolate to get the consumer + # timestamp. + aggregator.assert_metric( + "kafka.consumer_lag_seconds", tags=tags + ["consumer_group:{}".format(name)], at_least=1 + ) aggregator.assert_all_metrics_covered() @@ -168,6 +203,8 @@ def test_consumer_config_error(caplog, dd_run_check): @pytest.mark.skipif(is_legacy_check(), reason="This test does not apply to the legacy check.") @pytest.mark.integration @pytest.mark.usefixtures('dd_environment') +@mock.patch('datadog_checks.kafka_consumer.new_kafka_consumer.read_persistent_cache', mocked_read_persistent_cache) +@mock.patch('datadog_checks.kafka_consumer.new_kafka_consumer.time', mocked_time) def test_no_topics(aggregator, kafka_instance, dd_run_check): kafka_instance['consumer_groups'] = {'my_consumer': {}} kafka_consumer_check = KafkaCheck('kafka_consumer', {}, [kafka_instance]) @@ -178,6 +215,8 @@ def test_no_topics(aggregator, kafka_instance, dd_run_check): @pytest.mark.integration @pytest.mark.usefixtures('dd_environment') +@mock.patch('datadog_checks.kafka_consumer.new_kafka_consumer.read_persistent_cache', mocked_read_persistent_cache) +@mock.patch('datadog_checks.kafka_consumer.new_kafka_consumer.time', mocked_time) def test_no_partitions(aggregator, kafka_instance, dd_run_check): kafka_instance['consumer_groups'] = {'my_consumer': {'marvel': []}} kafka_consumer_check = KafkaCheck('kafka_consumer', {}, [kafka_instance])