Skip to content

Commit

Permalink
[AGENT-9940] Optimize high water offset collection
Browse files Browse the repository at this point in the history
  • Loading branch information
vivek-datadog committed Jul 24, 2023
1 parent 9c8be38 commit 514317a
Showing 1 changed file with 48 additions and 14 deletions.
62 changes: 48 additions & 14 deletions kafka_consumer/datadog_checks/kafka_consumer/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,34 +77,68 @@ def __get_authentication_config(self):

def get_highwater_offsets(self, consumer_offsets):
self.log.debug('Getting highwater offsets')

highwater_offsets = {}
topics_with_consumer_offset = {}
topics_with_consumer_offset = set()
topic_partition_with_consumer_offset = set()

if not self.config._monitor_all_broker_highwatermarks:
topics_with_consumer_offset = {(topic, partition) for (_, topic, partition) in consumer_offsets}
for (_, topic, partition) in consumer_offsets:
topics_with_consumer_offset.add(topic)
topic_partition_with_consumer_offset.add((topic, partition))

for consumer_group in consumer_offsets.items():
consumer = self.__create_consumer(consumer_group)
topics = consumer.list_topics(timeout=self.config._request_timeout)
clusters_queried = set()
consumer_groups_checked = set()

for consumer_group, _, _ in consumer_offsets:
self.log.debug('CONSUMER GROUP: %s', consumer_group)
if consumer_group in consumer_groups_checked:
self.log.debug('Consumer group %s topics already queried, skipping it', consumer_group)
continue
consumer = self.__create_consumer(consumer_group)
cluster_metadata = consumer.list_topics(timeout=self.config._request_timeout)

for topic in topics.topics:
topic_partitions = [
TopicPartition(topic, partition) for partition in list(topics.topics[topic].partitions.keys())
]
# Cluster id string, if supported by the broker, else None
cluster_id = cluster_metadata.cluster_id

for topic_partition in topic_partitions:
partition = topic_partition.partition
if topic not in KAFKA_INTERNAL_TOPICS and (
# Avoid querying the same cluster multiple times
if cluster_id in clusters_queried:
self.log.debug("Cluster %s topics already queried. Skipping it", cluster_metadata.cluster_id)
continue
# Check for existence as cluster_id is an optional value
elif cluster_id is not None:
clusters_queried.add(cluster_metadata.cluster_id)

topics = cluster_metadata.topics

for topic in topics:
if topic in KAFKA_INTERNAL_TOPICS:
self.log.debug("Skipping internal topic %s", topic)
continue
if not self.config._monitor_all_broker_highwatermarks and topic not in topics_with_consumer_offset:
self.log.debug("Skipping non-relevant topic %s", topic)
continue

for partition in topics[topic].partitions:
if (topic, partition) in highwater_offsets:
# Query the earliest occurrence of highwater offset. This can be converted to get the latest
# but that will be at the cost of multiple broker calls
self.log.debug("Skipping %s, %s as highwater offset is already collected", topic, partition)
continue
if (
self.config._monitor_all_broker_highwatermarks
or (topic, partition) in topics_with_consumer_offset
or (topic, partition) in topic_partition_with_consumer_offset
):
_, high_offset = consumer.get_watermark_offsets(topic_partition)
_, high_offset = consumer.get_watermark_offsets(TopicPartition(topic, partition))

self.log.debug('TOPIC: %s', topic)
self.log.debug('PARTITION: %s', partition)
self.log.debug('HIGHWATER OFFSET: %s', high_offset)

highwater_offsets[(topic, partition)] = high_offset
else:
self.log.debug("Skipping non-relevant partition %s of topic %s", partition, topic)
consumer_groups_checked.add(consumer_group)

self.log.debug('Got %s highwater offsets', len(highwater_offsets))
return highwater_offsets
Expand Down

0 comments on commit 514317a

Please sign in to comment.