Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize highwater offset collection #15285

Merged
merged 2 commits into from
Jul 27, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
[AGENT-9940] Add debug logs
  • Loading branch information
vivek-datadog committed Jul 26, 2023
commit d329491e6f4145bf9db023970de50ed841a61d19
11 changes: 11 additions & 0 deletions kafka_consumer/datadog_checks/kafka_consumer/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ def __get_authentication_config(self):
return config

def get_highwater_offsets(self, consumer_offsets):
self.log.debug('Getting highwater offsets')
highwater_offsets = {}
topics_with_consumer_offset = {}
if not self.config._monitor_all_broker_highwatermarks:
Expand All @@ -84,6 +85,7 @@ def get_highwater_offsets(self, consumer_offsets):
for consumer_group in consumer_offsets.items():
consumer = self.__create_consumer(consumer_group)
topics = consumer.list_topics(timeout=self.config._request_timeout)
self.log.debug('CONSUMER GROUP: %s', consumer_group)

for topic in topics.topics:
topic_partitions = [
Expand All @@ -98,8 +100,13 @@ def get_highwater_offsets(self, consumer_offsets):
):
_, high_offset = consumer.get_watermark_offsets(topic_partition)

self.log.debug('TOPIC: %s', topic)
self.log.debug('PARTITION: %s', partition)
self.log.debug('HIGHWATER OFFSET: %s', high_offset)

highwater_offsets[(topic, partition)] = high_offset

self.log.debug('Got %s highwater offsets', len(highwater_offsets))
return highwater_offsets

def get_partitions_for_topic(self, topic):
Expand All @@ -119,11 +126,14 @@ def request_metadata_update(self):

def get_consumer_offsets(self):
# {(consumer_group, topic, partition): offset}
self.log.debug('Getting consumer offsets')
consumer_offsets = {}

consumer_groups = self._get_consumer_groups()
self.log.debug('Identified %s consumer groups', len(consumer_groups))

futures = self._get_consumer_offset_futures(consumer_groups)
self.log.debug('%s futures to be waited on', len(futures))

for future in as_completed(futures):
try:
Expand Down Expand Up @@ -165,6 +175,7 @@ def get_consumer_offsets(self):
if self.config._consumer_groups_compiled_regex.match(to_match):
consumer_offsets[(consumer_group, topic, partition)] = offset

self.log.debug('Got %s consumer offsets', len(consumer_offsets))
return consumer_offsets

def _get_consumer_groups(self):
Expand Down
12 changes: 12 additions & 0 deletions kafka_consumer/datadog_checks/kafka_consumer/kafka_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,12 @@ def check(self, _):
raise

total_contexts = len(consumer_offsets) + len(highwater_offsets)
self.log.debug(
"Total contexts: %s, Consumer offsets: %s, Highwater offsets: %s",
total_contexts,
consumer_offsets,
highwater_offsets,
)
if total_contexts >= self._context_limit:
self.warning(
"""Discovered %s metric contexts - this exceeds the maximum number of %s contexts permitted by the
Expand All @@ -79,9 +85,11 @@ def report_highwater_offsets(self, highwater_offsets, contexts_limit):
broker_tags = ['topic:%s' % topic, 'partition:%s' % partition]
broker_tags.extend(self.config._custom_tags)
self.gauge('broker_offset', highwater_offset, tags=broker_tags)
self.log.debug('%s highwater offset reported with %s tags', highwater_offset, broker_tags)
reported_contexts += 1
if reported_contexts == contexts_limit:
return
self.log.debug('%s highwater offsets reported', reported_contexts)

def report_consumer_offsets_and_lag(self, consumer_offsets, highwater_offsets, contexts_limit):
"""Report the consumer offsets and consumer lag."""
Expand All @@ -94,6 +102,7 @@ def report_consumer_offsets_and_lag(self, consumer_offsets, highwater_offsets, c
str(reported_contexts),
str(contexts_limit),
)
self.log.debug('%s consumer offsets reported', reported_contexts)
return
consumer_group_tags = ['topic:%s' % topic, 'partition:%s' % partition, 'consumer_group:%s' % consumer_group]
consumer_group_tags.extend(self.config._custom_tags)
Expand All @@ -104,6 +113,7 @@ def report_consumer_offsets_and_lag(self, consumer_offsets, highwater_offsets, c
# report consumer offset if the partition is valid because even if leaderless the consumer offset will
# be valid once the leader failover completes
self.gauge('consumer_offset', consumer_offset, tags=consumer_group_tags)
self.log.debug('%s consumer offset reported with %s tags', consumer_offset, consumer_group_tags)
reported_contexts += 1

if (topic, partition) not in highwater_offsets:
Expand All @@ -119,6 +129,7 @@ def report_consumer_offsets_and_lag(self, consumer_offsets, highwater_offsets, c
consumer_lag = producer_offset - consumer_offset
if reported_contexts < contexts_limit:
self.gauge('consumer_lag', consumer_lag, tags=consumer_group_tags)
self.log.debug('%s consumer lag reported with %s tags', consumer_lag, consumer_group_tags)
reported_contexts += 1

if consumer_lag < 0:
Expand All @@ -145,6 +156,7 @@ def report_consumer_offsets_and_lag(self, consumer_offsets, highwater_offsets, c
)
self.log.warning(msg, consumer_group, topic, partition)
self.client.request_metadata_update() # force metadata update on next poll()
self.log.debug('%s consumer offsets reported', reported_contexts)

def send_event(self, title, text, tags, event_type, aggregation_key, severity='info'):
"""Emit an event to the Datadog Event Stream."""
Expand Down