Skip to content

Commit

Permalink
Handle missing partitions and better logging (#9089)
Browse files Browse the repository at this point in the history
  • Loading branch information
mgarabed authored Apr 5, 2021
1 parent 282ace6 commit d096f8e
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 16 deletions.
22 changes: 14 additions & 8 deletions kafka_consumer/datadog_checks/kafka_consumer/kafka_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,9 @@ def _report_consumer_offsets_and_lag(self, contexts_limit):
return
consumer_group_tags = ['topic:%s' % topic, 'partition:%s' % partition, 'consumer_group:%s' % consumer_group]
consumer_group_tags.extend(self._custom_tags)
if partition in self.kafka_client._client.cluster.partitions_for_topic(topic):

partitions = self.kafka_client._client.cluster.partitions_for_topic(topic)
if partition in partitions:
# report consumer offset if the partition is valid because even if leaderless the consumer offset will
# be valid once the leader failover completes
self.gauge('consumer_offset', consumer_offset, tags=consumer_group_tags)
Expand Down Expand Up @@ -315,13 +317,17 @@ def _report_consumer_offsets_and_lag(self, contexts_limit):
self.log.debug(message)

else:
self.log.warning(
"Consumer group: %s has offsets for topic: %s, partition: %s, but that topic partition doesn't "
"actually exist in the cluster so skipping reporting these offsets.",
consumer_group,
topic,
partition,
)
if partitions is None:
msg = (
"Consumer group: %s has offsets for topic: %s, partition: %s, but that topic has no partitions "
"in the cluster, so skipping reporting these offsets.",
)
else:
msg = (
"Consumer group: %s has offsets for topic: %s, partition: %s, but that topic partition isn't "
"included in the cluster partitions, so skipping reporting these offsets.",
)
self.log.warning(msg, consumer_group, topic, partition)
self.kafka_client._client.cluster.request_update() # force metadata update on next poll()

def _get_consumer_offsets(self):
Expand Down
22 changes: 14 additions & 8 deletions kafka_consumer/datadog_checks/kafka_consumer/legacy_0_10_2.py
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,9 @@ def _report_consumer_offsets_and_lag(self, consumer_offsets, **kwargs):
if 'source' in kwargs:
consumer_group_tags.append('source:%s' % kwargs['source'])
consumer_group_tags.extend(self._custom_tags)
if partition in self._kafka_client.cluster.partitions_for_topic(topic):

partitions = self._kafka_client.cluster.partitions_for_topic(topic)
if partitions is not None and partition in partitions:
# report consumer offset if the partition is valid because even if leaderless the consumer offset will
# be valid once the leader failover completes
self.gauge('consumer_offset', consumer_offset, tags=consumer_group_tags)
Expand Down Expand Up @@ -319,13 +321,17 @@ def _report_consumer_offsets_and_lag(self, consumer_offsets, **kwargs):
self.log.debug(message)

else:
self.log.warning(
"Consumer group: %s has offsets for topic: %s, partition: %s, but that topic partition doesn't "
"appear to exist in the cluster so skipping reporting these offsets.",
consumer_group,
topic,
partition,
)
if partitions is None:
msg = (
"Consumer group: %s has offsets for topic: %s, partition: %s, but that topic has no partitions "
"in the cluster, so skipping reporting these offsets.",
)
else:
msg = (
"Consumer group: %s has offsets for topic: %s, partition: %s, but that topic partition isn't "
"included in the cluster partitions, so skipping reporting these offsets.",
)
self.log.warning(msg, consumer_group, topic, partition)
self._kafka_client.cluster.request_update() # force metadata update on next poll()

def _get_zk_path_children(self, zk_path, name_for_error):
Expand Down

0 comments on commit d096f8e

Please sign in to comment.