diff --git a/kafka/admin/client.py b/kafka/admin/client.py index 2cb5c57d2..454c5b3b1 100644 --- a/kafka/admin/client.py +++ b/kafka/admin/client.py @@ -473,7 +473,7 @@ def delete_topics(self, topics, timeout_ms=None): return response - def _get_cluster_metadata(self, topics=None, auto_topic_creation=False): + def _get_cluster_metadata(self, topics=None, auto_topic_creation=False, use_controller=False): """ topics == None means "get all topics" """ @@ -492,10 +492,13 @@ def _get_cluster_metadata(self, topics=None, auto_topic_creation=False): allow_auto_topic_creation=auto_topic_creation ) - future = self._send_request_to_node( - self._client.least_loaded_node(), - request - ) + if use_controller: + future = self._send_request_to_controller(request) + else: + future = self._send_request_to_node( + self._client.least_loaded_node(), + request + ) self._wait_for_futures([future]) return future.value @@ -505,7 +508,7 @@ def list_topics(self): return [t['topic'] for t in obj['topics']] def describe_topics(self, topics=None): - metadata = self._get_cluster_metadata(topics=topics) + metadata = self._get_cluster_metadata(topics=topics, use_controller=True) obj = metadata.to_object() return obj['topics']