diff --git a/kafka_consumer/datadog_checks/kafka_consumer/new_kafka_consumer.py b/kafka_consumer/datadog_checks/kafka_consumer/new_kafka_consumer.py index 5947b449fa4e3..1e0e285d089b8 100644 --- a/kafka_consumer/datadog_checks/kafka_consumer/new_kafka_consumer.py +++ b/kafka_consumer/datadog_checks/kafka_consumer/new_kafka_consumer.py @@ -192,6 +192,10 @@ def _get_highwater_offsets(self): # Loop until all futures resolved. self.kafka_client._wait_for_futures(highwater_futures) + # FIXME: This is using a workaround to skip socket wakeup, which causes blocking + # (see https://github.com/dpkp/kafka-python/issues/2286). + # Once https://github.com/dpkp/kafka-python/pull/2335 is merged in, we can use the official + # implementation for this function instead. def _send_request_to_node(self, node_id, request, wakeup=True): while not self.kafka_client._client.ready(node_id): # poll until the connection to broker is ready, otherwise send() @@ -376,6 +380,10 @@ def _get_consumer_offsets(self): if self._monitor_unlisted_consumer_groups: for broker in self.kafka_client._client.cluster.brokers(): + # FIXME: This is using a workaround to skip socket wakeup, which causes blocking + # (see https://github.com/dpkp/kafka-python/issues/2286). + # Once https://github.com/dpkp/kafka-python/pull/2335 is merged in, we can use the official + # implementation for this function instead. list_groups_future = self._list_consumer_groups_send_request(broker.nodeId) list_groups_future.add_callback(self._list_groups_callback, broker.nodeId) self._consumer_futures.append(list_groups_future)