Skip to content

Commit

Permalink
[AGENT-9940] Add debug logs
Browse files Browse the repository at this point in the history
  • Loading branch information
vivek-datadog committed Jul 18, 2023
1 parent e97c04d commit d41e638
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 1 deletion.
2 changes: 1 addition & 1 deletion kafka_consumer/datadog_checks/kafka_consumer/__about__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@
# All rights reserved
# Licensed under a 3-clause BSD style license (see LICENSE)

__version__ = "3.1.2"
__version__ = "3.1.2-beta.1"
11 changes: 11 additions & 0 deletions kafka_consumer/datadog_checks/kafka_consumer/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ def __get_authentication_config(self):
return config

def get_highwater_offsets(self, consumer_offsets):
self.log.debug('Getting highwater offsets')
highwater_offsets = {}
topics_with_consumer_offset = {}
if not self.config._monitor_all_broker_highwatermarks:
Expand All @@ -84,6 +85,7 @@ def get_highwater_offsets(self, consumer_offsets):
for consumer_group in consumer_offsets.items():
consumer = self.__create_consumer(consumer_group)
topics = consumer.list_topics(timeout=self.config._request_timeout)
self.log.debug('CONSUMER GROUP: %s', consumer_group)

for topic in topics.topics:
topic_partitions = [
Expand All @@ -98,8 +100,13 @@ def get_highwater_offsets(self, consumer_offsets):
):
_, high_offset = consumer.get_watermark_offsets(topic_partition)

self.log.debug('TOPIC: %s', topic)
self.log.debug('PARTITION: %s', partition)
self.log.debug('HIGHWATER OFFSET: %s', high_offset)

highwater_offsets[(topic, partition)] = high_offset

self.log.debug('Got %s highwater offsets', len(highwater_offsets))
return highwater_offsets

def get_partitions_for_topic(self, topic):
Expand All @@ -119,11 +126,14 @@ def request_metadata_update(self):

def get_consumer_offsets(self):
# {(consumer_group, topic, partition): offset}
self.log.debug('Getting consumer offsets')
consumer_offsets = {}

consumer_groups = self._get_consumer_groups()
self.log.debug('Identified %s consumer groups', len(consumer_groups))

futures = self._get_consumer_offset_futures(consumer_groups)
self.log.debug('%s futures to be waited on', len(futures))

for future in as_completed(futures):
try:
Expand Down Expand Up @@ -165,6 +175,7 @@ def get_consumer_offsets(self):
if self.config._consumer_groups_compiled_regex.match(to_match):
consumer_offsets[(consumer_group, topic, partition)] = offset

self.log.debug('Got %s consumer offsets', len(consumer_offsets))
return consumer_offsets

def _get_consumer_groups(self):
Expand Down
12 changes: 12 additions & 0 deletions kafka_consumer/datadog_checks/kafka_consumer/kafka_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,12 @@ def check(self, _):
raise

total_contexts = len(consumer_offsets) + len(highwater_offsets)
self.log.debug(
"Total contexts: %s, Consumer offsets: %s, Highwater offsets: %s",
total_contexts,
consumer_offsets,
highwater_offsets,
)
if total_contexts >= self._context_limit:
self.warning(
"""Discovered %s metric contexts - this exceeds the maximum number of %s contexts permitted by the
Expand All @@ -79,9 +85,11 @@ def report_highwater_offsets(self, highwater_offsets, contexts_limit):
broker_tags = ['topic:%s' % topic, 'partition:%s' % partition]
broker_tags.extend(self.config._custom_tags)
self.gauge('broker_offset', highwater_offset, tags=broker_tags)
self.log.debug('%s highwater offset reported with %s tags', highwater_offset, broker_tags)
reported_contexts += 1
if reported_contexts == contexts_limit:
return
self.log.debug('%s highwater offsets reported', reported_contexts)

def report_consumer_offsets_and_lag(self, consumer_offsets, highwater_offsets, contexts_limit):
"""Report the consumer offsets and consumer lag."""
Expand All @@ -94,6 +102,7 @@ def report_consumer_offsets_and_lag(self, consumer_offsets, highwater_offsets, c
str(reported_contexts),
str(contexts_limit),
)
self.log.debug('%s consumer offsets reported', reported_contexts)
return
consumer_group_tags = ['topic:%s' % topic, 'partition:%s' % partition, 'consumer_group:%s' % consumer_group]
consumer_group_tags.extend(self.config._custom_tags)
Expand All @@ -104,6 +113,7 @@ def report_consumer_offsets_and_lag(self, consumer_offsets, highwater_offsets, c
# report consumer offset if the partition is valid because even if leaderless the consumer offset will
# be valid once the leader failover completes
self.gauge('consumer_offset', consumer_offset, tags=consumer_group_tags)
self.log.debug('%s consumer offset reported with %s tags', consumer_offset, consumer_group_tags)
reported_contexts += 1

if (topic, partition) not in highwater_offsets:
Expand All @@ -119,6 +129,7 @@ def report_consumer_offsets_and_lag(self, consumer_offsets, highwater_offsets, c
consumer_lag = producer_offset - consumer_offset
if reported_contexts < contexts_limit:
self.gauge('consumer_lag', consumer_lag, tags=consumer_group_tags)
self.log.debug('%s consumer lag reported with %s tags', consumer_lag, consumer_group_tags)
reported_contexts += 1

if consumer_lag < 0:
Expand All @@ -145,6 +156,7 @@ def report_consumer_offsets_and_lag(self, consumer_offsets, highwater_offsets, c
)
self.log.warning(msg, consumer_group, topic, partition)
self.client.request_metadata_update() # force metadata update on next poll()
self.log.debug('%s consumer offsets reported', reported_contexts)

def send_event(self, title, text, tags, event_type, aggregation_key, severity='info'):
"""Emit an event to the Datadog Event Stream."""
Expand Down

0 comments on commit d41e638

Please sign in to comment.