From d096f8e7934916f792f6a46a3040a5ec6688dc7b Mon Sep 17 00:00:00 2001 From: Mike Garabedian Date: Mon, 5 Apr 2021 16:43:02 -0400 Subject: [PATCH] Handle missing partitions and better logging (#9089) --- .../kafka_consumer/kafka_consumer.py | 22 ++++++++++++------- .../kafka_consumer/legacy_0_10_2.py | 22 ++++++++++++------- 2 files changed, 28 insertions(+), 16 deletions(-) diff --git a/kafka_consumer/datadog_checks/kafka_consumer/kafka_consumer.py b/kafka_consumer/datadog_checks/kafka_consumer/kafka_consumer.py index 0af8a4805cbd6..7e4740cac680f 100644 --- a/kafka_consumer/datadog_checks/kafka_consumer/kafka_consumer.py +++ b/kafka_consumer/datadog_checks/kafka_consumer/kafka_consumer.py @@ -281,7 +281,9 @@ def _report_consumer_offsets_and_lag(self, contexts_limit): return consumer_group_tags = ['topic:%s' % topic, 'partition:%s' % partition, 'consumer_group:%s' % consumer_group] consumer_group_tags.extend(self._custom_tags) - if partition in self.kafka_client._client.cluster.partitions_for_topic(topic): + + partitions = self.kafka_client._client.cluster.partitions_for_topic(topic) + if partition in partitions: # report consumer offset if the partition is valid because even if leaderless the consumer offset will # be valid once the leader failover completes self.gauge('consumer_offset', consumer_offset, tags=consumer_group_tags) @@ -315,13 +317,17 @@ def _report_consumer_offsets_and_lag(self, contexts_limit): self.log.debug(message) else: - self.log.warning( - "Consumer group: %s has offsets for topic: %s, partition: %s, but that topic partition doesn't " - "actually exist in the cluster so skipping reporting these offsets.", - consumer_group, - topic, - partition, - ) + if partitions is None: + msg = ( + "Consumer group: %s has offsets for topic: %s, partition: %s, but that topic has no partitions " + "in the cluster, so skipping reporting these offsets.", + ) + else: + msg = ( + "Consumer group: %s has offsets for topic: %s, partition: %s, but that topic partition isn't " + "included in the cluster partitions, so skipping reporting these offsets.", + ) + self.log.warning(msg, consumer_group, topic, partition) self.kafka_client._client.cluster.request_update() # force metadata update on next poll() def _get_consumer_offsets(self): diff --git a/kafka_consumer/datadog_checks/kafka_consumer/legacy_0_10_2.py b/kafka_consumer/datadog_checks/kafka_consumer/legacy_0_10_2.py index 3e40b85f5aaad..ab149fc89cd37 100644 --- a/kafka_consumer/datadog_checks/kafka_consumer/legacy_0_10_2.py +++ b/kafka_consumer/datadog_checks/kafka_consumer/legacy_0_10_2.py @@ -289,7 +289,9 @@ def _report_consumer_offsets_and_lag(self, consumer_offsets, **kwargs): if 'source' in kwargs: consumer_group_tags.append('source:%s' % kwargs['source']) consumer_group_tags.extend(self._custom_tags) - if partition in self._kafka_client.cluster.partitions_for_topic(topic): + + partitions = self._kafka_client.cluster.partitions_for_topic(topic) + if partitions is not None and partition in partitions: # report consumer offset if the partition is valid because even if leaderless the consumer offset will # be valid once the leader failover completes self.gauge('consumer_offset', consumer_offset, tags=consumer_group_tags) @@ -319,13 +321,17 @@ def _report_consumer_offsets_and_lag(self, consumer_offsets, **kwargs): self.log.debug(message) else: - self.log.warning( - "Consumer group: %s has offsets for topic: %s, partition: %s, but that topic partition doesn't " - "appear to exist in the cluster so skipping reporting these offsets.", - consumer_group, - topic, - partition, - ) + if partitions is None: + msg = ( + "Consumer group: %s has offsets for topic: %s, partition: %s, but that topic has no partitions " + "in the cluster, so skipping reporting these offsets.", + ) + else: + msg = ( + "Consumer group: %s has offsets for topic: %s, partition: %s, but that topic partition isn't " + "included in the cluster partitions, so skipping reporting these offsets.", + ) + self.log.warning(msg, consumer_group, topic, partition) self._kafka_client.cluster.request_update() # force metadata update on next poll() def _get_zk_path_children(self, zk_path, name_for_error):