Skip to content

Commit

Permalink
Add kafka consumer logs for more visibility (#13679)
Browse files Browse the repository at this point in the history
* Add kafka consumer logs for more visibility

* Fix style
  • Loading branch information
yzhan289 authored Jan 19, 2023
1 parent 682853a commit 8f5367d
Showing 1 changed file with 9 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,7 @@ def _highwater_offsets_callback(self, response):
def _report_highwater_offsets(self, contexts_limit):
"""Report the broker highwater offsets."""
reported_contexts = 0
self.log.debug("Reporting broker offset metric")
for (topic, partition), highwater_offset in self._highwater_offsets.items():
broker_tags = ['topic:%s' % topic, 'partition:%s' % partition]
broker_tags.extend(self._custom_tags)
Expand All @@ -250,13 +251,20 @@ def _report_highwater_offsets(self, contexts_limit):
def _report_consumer_offsets_and_lag(self, contexts_limit):
"""Report the consumer offsets and consumer lag."""
reported_contexts = 0
self.log.debug("Reporting consumer offsets and lag metrics")
for (consumer_group, topic, partition), consumer_offset in self._consumer_offsets.items():
if reported_contexts >= contexts_limit:
self.log.debug(
"Reported contexts number %s greater than or equal to contexts limit of %s, returning",
str(reported_contexts),
str(contexts_limit),
)
return
consumer_group_tags = ['topic:%s' % topic, 'partition:%s' % partition, 'consumer_group:%s' % consumer_group]
consumer_group_tags.extend(self._custom_tags)

partitions = self.kafka_client._client.cluster.partitions_for_topic(topic)
self.log.debug("Received partitions %s for topic %s", partitions, topic)
if partitions is not None and partition in partitions:
# report consumer offset if the partition is valid because even if leaderless the consumer offset will
# be valid once the leader failover completes
Expand Down Expand Up @@ -445,6 +453,7 @@ def _single_group_offsets_callback(self, consumer_group, response):
associate these offsets to the proper consumer group.
"""
single_group_offsets = self.kafka_client._list_consumer_group_offsets_process_response(response)
self.log.debug("Single group offsets: %s", single_group_offsets)
for (topic, partition), (offset, _metadata) in single_group_offsets.items():
# If the OffsetFetchRequest explicitly specified partitions, the offset could returned as -1, meaning there
# is no recorded offset for that partition... for example, if the partition doesn't exist in the cluster.
Expand Down

0 comments on commit 8f5367d

Please sign in to comment.