Skip to content

Commit

Permalink
Add comment
Browse files Browse the repository at this point in the history
  • Loading branch information
yzhan289 committed Nov 15, 2022
1 parent 0c6669f commit 4d74a9f
Showing 1 changed file with 8 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,10 @@ def _get_highwater_offsets(self):
# Loop until all futures resolved.
self.kafka_client._wait_for_futures(highwater_futures)

# FIXME: This is using a workaround to skip socket wakeup, which causes blocking
# (see https://github.com/dpkp/kafka-python/issues/2286).
# Once https://github.com/dpkp/kafka-python/pull/2335 is merged in, we can use the official
# implementation for this function instead.
def _send_request_to_node(self, node_id, request, wakeup=True):
while not self.kafka_client._client.ready(node_id):
# poll until the connection to broker is ready, otherwise send()
Expand Down Expand Up @@ -376,6 +380,10 @@ def _get_consumer_offsets(self):

if self._monitor_unlisted_consumer_groups:
for broker in self.kafka_client._client.cluster.brokers():
# FIXME: This is using a workaround to skip socket wakeup, which causes blocking
# (see https://github.com/dpkp/kafka-python/issues/2286).
# Once https://github.com/dpkp/kafka-python/pull/2335 is merged in, we can use the official
# implementation for this function instead.
list_groups_future = self._list_consumer_groups_send_request(broker.nodeId)
list_groups_future.add_callback(self._list_groups_callback, broker.nodeId)
self._consumer_futures.append(list_groups_future)
Expand Down

0 comments on commit 4d74a9f

Please sign in to comment.