Skip to content

Commit

Permalink
Admin protocol updates (#1948)
Browse files Browse the repository at this point in the history
  • Loading branch information
TylerLubeck authored and dpkp committed Dec 29, 2019
1 parent e3362ac commit e06ea70
Show file tree
Hide file tree
Showing 3 changed files with 284 additions and 30 deletions.
37 changes: 27 additions & 10 deletions kafka/admin/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -435,7 +435,7 @@ def create_topics(self, new_topics, timeout_ms=None, validate_only=False):
create_topic_requests=[self._convert_new_topic_request(new_topic) for new_topic in new_topics],
timeout=timeout_ms
)
elif version <= 2:
elif version <= 3:
request = CreateTopicsRequest[version](
create_topic_requests=[self._convert_new_topic_request(new_topic) for new_topic in new_topics],
timeout=timeout_ms,
Expand All @@ -459,7 +459,7 @@ def delete_topics(self, topics, timeout_ms=None):
"""
version = self._matching_api_version(DeleteTopicsRequest)
timeout_ms = self._validate_timeout(timeout_ms)
if version <= 1:
if version <= 3:
request = DeleteTopicsRequest[version](
topics=topics,
timeout=timeout_ms
Expand Down Expand Up @@ -803,7 +803,7 @@ def describe_configs(self, config_resources, include_synonyms=False):
DescribeConfigsRequest[version](resources=topic_resources)
))

elif version == 1:
elif version <= 2:
if len(broker_resources) > 0:
for broker_resource in broker_resources:
try:
Expand Down Expand Up @@ -853,7 +853,7 @@ def alter_configs(self, config_resources):
:return: Appropriate version of AlterConfigsResponse class.
"""
version = self._matching_api_version(AlterConfigsRequest)
if version == 0:
if version <= 1:
request = AlterConfigsRequest[version](
resources=[self._convert_alter_config_resource_request(config_resource) for config_resource in config_resources]
)
Expand Down Expand Up @@ -901,7 +901,7 @@ def create_partitions(self, topic_partitions, timeout_ms=None, validate_only=Fal
"""
version = self._matching_api_version(CreatePartitionsRequest)
timeout_ms = self._validate_timeout(timeout_ms)
if version == 0:
if version <= 1:
request = CreatePartitionsRequest[version](
topic_partitions=[self._convert_create_partitions_request(topic_name, new_partitions) for topic_name, new_partitions in topic_partitions.items()],
timeout=timeout_ms,
Expand All @@ -928,7 +928,7 @@ def create_partitions(self, topic_partitions, timeout_ms=None, validate_only=Fal
# describe delegation_token protocol not yet implemented
# Note: send the request to the least_loaded_node()

def _describe_consumer_groups_send_request(self, group_id, group_coordinator_id):
def _describe_consumer_groups_send_request(self, group_id, group_coordinator_id, include_authorized_operations=False):
"""Send a DescribeGroupsRequest to the group's coordinator.
:param group_id: The group name as a string
Expand All @@ -937,13 +937,24 @@ def _describe_consumer_groups_send_request(self, group_id, group_coordinator_id)
:return: A message future.
"""
version = self._matching_api_version(DescribeGroupsRequest)
if version <= 1:
if version <= 2:
if include_authorized_operations:
raise IncompatibleBrokerVersion(
"include_authorized_operations requests "
"DescribeGroupsRequest >= v3, which is not "
"supported by Kafka {}".format(version)
)
# Note: KAFKA-6788 A potential optimization is to group the
# request per coordinator and send one request with a list of
# all consumer groups. Java still hasn't implemented this
# because the error checking is hard to get right when some
# groups error and others don't.
request = DescribeGroupsRequest[version](groups=(group_id,))
elif version <= 3:
request = DescribeGroupsRequest[version](
groups=(group_id,),
include_authorized_operations=include_authorized_operations
)
else:
raise NotImplementedError(
"Support for DescribeGroupsRequest_v{} has not yet been added to KafkaAdminClient."
Expand All @@ -952,7 +963,7 @@ def _describe_consumer_groups_send_request(self, group_id, group_coordinator_id)

def _describe_consumer_groups_process_response(self, response):
"""Process a DescribeGroupsResponse into a group description."""
if response.API_VERSION <= 1:
if response.API_VERSION <= 3:
assert len(response.groups) == 1
# TODO need to implement converting the response tuple into
# a more accessible interface like a namedtuple and then stop
Expand All @@ -976,7 +987,7 @@ def _describe_consumer_groups_process_response(self, response):
.format(response.API_VERSION))
return group_description

def describe_consumer_groups(self, group_ids, group_coordinator_id=None):
def describe_consumer_groups(self, group_ids, group_coordinator_id=None, include_authorized_operations=False):
"""Describe a set of consumer groups.
Any errors are immediately raised.
Expand All @@ -989,6 +1000,9 @@ def describe_consumer_groups(self, group_ids, group_coordinator_id=None):
useful for avoiding extra network round trips if you already know
the group coordinator. This is only useful when all the group_ids
have the same coordinator, otherwise it will error. Default: None.
:param include_authorized_operatoins: Whether or not to include
information about the operations a group is allowed to perform.
Only supported on API version >= v3. Default: False.
:return: A list of group descriptions. For now the group descriptions
are the raw results from the DescribeGroupsResponse. Long-term, we
plan to change this to return namedtuples as well as decoding the
Expand All @@ -1001,7 +1015,10 @@ def describe_consumer_groups(self, group_ids, group_coordinator_id=None):
this_groups_coordinator_id = group_coordinator_id
else:
this_groups_coordinator_id = self._find_coordinator_id(group_id)
f = self._describe_consumer_groups_send_request(group_id, this_groups_coordinator_id)
f = self._describe_consumer_groups_send_request(
group_id,
this_groups_coordinator_id,
include_authorized_operations)
futures.append(f)

self._wait_for_futures(futures)
Expand Down
Loading

0 comments on commit e06ea70

Please sign in to comment.