diff --git a/kafka_consumer/datadog_checks/kafka_consumer/new_kafka_consumer.py b/kafka_consumer/datadog_checks/kafka_consumer/new_kafka_consumer.py index b106c0e140ea4..201cfa1e15898 100644 --- a/kafka_consumer/datadog_checks/kafka_consumer/new_kafka_consumer.py +++ b/kafka_consumer/datadog_checks/kafka_consumer/new_kafka_consumer.py @@ -239,6 +239,7 @@ def _highwater_offsets_callback(self, response): def _report_highwater_offsets(self, contexts_limit): """Report the broker highwater offsets.""" reported_contexts = 0 + self.log.debug("Reporting broker offset metric") for (topic, partition), highwater_offset in self._highwater_offsets.items(): broker_tags = ['topic:%s' % topic, 'partition:%s' % partition] broker_tags.extend(self._custom_tags) @@ -250,13 +251,20 @@ def _report_highwater_offsets(self, contexts_limit): def _report_consumer_offsets_and_lag(self, contexts_limit): """Report the consumer offsets and consumer lag.""" reported_contexts = 0 + self.log.debug("Reporting consumer offsets and lag metrics") for (consumer_group, topic, partition), consumer_offset in self._consumer_offsets.items(): if reported_contexts >= contexts_limit: + self.log.debug( + "Reported contexts number %s greater than or equal to contexts limit of %s, returning", + str(reported_contexts), + str(contexts_limit), + ) return consumer_group_tags = ['topic:%s' % topic, 'partition:%s' % partition, 'consumer_group:%s' % consumer_group] consumer_group_tags.extend(self._custom_tags) partitions = self.kafka_client._client.cluster.partitions_for_topic(topic) + self.log.debug("Received partitions %s for topic %s", partitions, topic) if partitions is not None and partition in partitions: # report consumer offset if the partition is valid because even if leaderless the consumer offset will # be valid once the leader failover completes @@ -445,6 +453,7 @@ def _single_group_offsets_callback(self, consumer_group, response): associate these offsets to the proper consumer group. """ single_group_offsets = self.kafka_client._list_consumer_group_offsets_process_response(response) + self.log.debug("Single group offsets: %s", single_group_offsets) for (topic, partition), (offset, _metadata) in single_group_offsets.items(): # If the OffsetFetchRequest explicitly specified partitions, the offset could returned as -1, meaning there # is no recorded offset for that partition... for example, if the partition doesn't exist in the cluster.