Skip to content

Commit

Permalink
Use the controller for topic metadata requests
Browse files Browse the repository at this point in the history
Closes #1994
  • Loading branch information
TylerLubeck authored and jeffwidman committed Feb 6, 2020
1 parent da01fef commit f92889a
Showing 1 changed file with 9 additions and 6 deletions.
15 changes: 9 additions & 6 deletions kafka/admin/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -473,7 +473,7 @@ def delete_topics(self, topics, timeout_ms=None):
return response


def _get_cluster_metadata(self, topics=None, auto_topic_creation=False):
def _get_cluster_metadata(self, topics=None, auto_topic_creation=False, use_controller=False):
"""
topics == None means "get all topics"
"""
Expand All @@ -492,10 +492,13 @@ def _get_cluster_metadata(self, topics=None, auto_topic_creation=False):
allow_auto_topic_creation=auto_topic_creation
)

future = self._send_request_to_node(
self._client.least_loaded_node(),
request
)
if use_controller:
future = self._send_request_to_controller(request)
else:
future = self._send_request_to_node(
self._client.least_loaded_node(),
request
)
self._wait_for_futures([future])
return future.value

Expand All @@ -505,7 +508,7 @@ def list_topics(self):
return [t['topic'] for t in obj['topics']]

def describe_topics(self, topics=None):
metadata = self._get_cluster_metadata(topics=topics)
metadata = self._get_cluster_metadata(topics=topics, use_controller=True)
obj = metadata.to_object()
return obj['topics']

Expand Down

0 comments on commit f92889a

Please sign in to comment.