From e06ea70174e0b114bec8072371a54ae6bcd73da5 Mon Sep 17 00:00:00 2001 From: Tyler Lubeck Date: Sun, 29 Dec 2019 15:47:32 -0800 Subject: [PATCH] Admin protocol updates (#1948) --- kafka/admin/client.py | 37 +++- kafka/protocol/admin.py | 259 +++++++++++++++++++++++-- test/test_api_object_implementation.py | 18 ++ 3 files changed, 284 insertions(+), 30 deletions(-) create mode 100644 test/test_api_object_implementation.py diff --git a/kafka/admin/client.py b/kafka/admin/client.py index 8afe95b4f..accbf1468 100644 --- a/kafka/admin/client.py +++ b/kafka/admin/client.py @@ -435,7 +435,7 @@ def create_topics(self, new_topics, timeout_ms=None, validate_only=False): create_topic_requests=[self._convert_new_topic_request(new_topic) for new_topic in new_topics], timeout=timeout_ms ) - elif version <= 2: + elif version <= 3: request = CreateTopicsRequest[version]( create_topic_requests=[self._convert_new_topic_request(new_topic) for new_topic in new_topics], timeout=timeout_ms, @@ -459,7 +459,7 @@ def delete_topics(self, topics, timeout_ms=None): """ version = self._matching_api_version(DeleteTopicsRequest) timeout_ms = self._validate_timeout(timeout_ms) - if version <= 1: + if version <= 3: request = DeleteTopicsRequest[version]( topics=topics, timeout=timeout_ms @@ -803,7 +803,7 @@ def describe_configs(self, config_resources, include_synonyms=False): DescribeConfigsRequest[version](resources=topic_resources) )) - elif version == 1: + elif version <= 2: if len(broker_resources) > 0: for broker_resource in broker_resources: try: @@ -853,7 +853,7 @@ def alter_configs(self, config_resources): :return: Appropriate version of AlterConfigsResponse class. """ version = self._matching_api_version(AlterConfigsRequest) - if version == 0: + if version <= 1: request = AlterConfigsRequest[version]( resources=[self._convert_alter_config_resource_request(config_resource) for config_resource in config_resources] ) @@ -901,7 +901,7 @@ def create_partitions(self, topic_partitions, timeout_ms=None, validate_only=Fal """ version = self._matching_api_version(CreatePartitionsRequest) timeout_ms = self._validate_timeout(timeout_ms) - if version == 0: + if version <= 1: request = CreatePartitionsRequest[version]( topic_partitions=[self._convert_create_partitions_request(topic_name, new_partitions) for topic_name, new_partitions in topic_partitions.items()], timeout=timeout_ms, @@ -928,7 +928,7 @@ def create_partitions(self, topic_partitions, timeout_ms=None, validate_only=Fal # describe delegation_token protocol not yet implemented # Note: send the request to the least_loaded_node() - def _describe_consumer_groups_send_request(self, group_id, group_coordinator_id): + def _describe_consumer_groups_send_request(self, group_id, group_coordinator_id, include_authorized_operations=False): """Send a DescribeGroupsRequest to the group's coordinator. :param group_id: The group name as a string @@ -937,13 +937,24 @@ def _describe_consumer_groups_send_request(self, group_id, group_coordinator_id) :return: A message future. """ version = self._matching_api_version(DescribeGroupsRequest) - if version <= 1: + if version <= 2: + if include_authorized_operations: + raise IncompatibleBrokerVersion( + "include_authorized_operations requests " + "DescribeGroupsRequest >= v3, which is not " + "supported by Kafka {}".format(version) + ) # Note: KAFKA-6788 A potential optimization is to group the # request per coordinator and send one request with a list of # all consumer groups. Java still hasn't implemented this # because the error checking is hard to get right when some # groups error and others don't. request = DescribeGroupsRequest[version](groups=(group_id,)) + elif version <= 3: + request = DescribeGroupsRequest[version]( + groups=(group_id,), + include_authorized_operations=include_authorized_operations + ) else: raise NotImplementedError( "Support for DescribeGroupsRequest_v{} has not yet been added to KafkaAdminClient." @@ -952,7 +963,7 @@ def _describe_consumer_groups_send_request(self, group_id, group_coordinator_id) def _describe_consumer_groups_process_response(self, response): """Process a DescribeGroupsResponse into a group description.""" - if response.API_VERSION <= 1: + if response.API_VERSION <= 3: assert len(response.groups) == 1 # TODO need to implement converting the response tuple into # a more accessible interface like a namedtuple and then stop @@ -976,7 +987,7 @@ def _describe_consumer_groups_process_response(self, response): .format(response.API_VERSION)) return group_description - def describe_consumer_groups(self, group_ids, group_coordinator_id=None): + def describe_consumer_groups(self, group_ids, group_coordinator_id=None, include_authorized_operations=False): """Describe a set of consumer groups. Any errors are immediately raised. @@ -989,6 +1000,9 @@ def describe_consumer_groups(self, group_ids, group_coordinator_id=None): useful for avoiding extra network round trips if you already know the group coordinator. This is only useful when all the group_ids have the same coordinator, otherwise it will error. Default: None. + :param include_authorized_operatoins: Whether or not to include + information about the operations a group is allowed to perform. + Only supported on API version >= v3. Default: False. :return: A list of group descriptions. For now the group descriptions are the raw results from the DescribeGroupsResponse. Long-term, we plan to change this to return namedtuples as well as decoding the @@ -1001,7 +1015,10 @@ def describe_consumer_groups(self, group_ids, group_coordinator_id=None): this_groups_coordinator_id = group_coordinator_id else: this_groups_coordinator_id = self._find_coordinator_id(group_id) - f = self._describe_consumer_groups_send_request(group_id, this_groups_coordinator_id) + f = self._describe_consumer_groups_send_request( + group_id, + this_groups_coordinator_id, + include_authorized_operations) futures.append(f) self._wait_for_futures(futures) diff --git a/kafka/protocol/admin.py b/kafka/protocol/admin.py index e6efad784..b2694dc96 100644 --- a/kafka/protocol/admin.py +++ b/kafka/protocol/admin.py @@ -1,7 +1,7 @@ from __future__ import absolute_import from kafka.protocol.api import Request, Response -from kafka.protocol.types import Array, Boolean, Bytes, Int8, Int16, Int32, Schema, String +from kafka.protocol.types import Array, Boolean, Bytes, Int8, Int16, Int32, Int64, Schema, String class ApiVersionResponse_v0(Response): @@ -29,6 +29,12 @@ class ApiVersionResponse_v1(Response): ) +class ApiVersionResponse_v2(Response): + API_KEY = 18 + API_VERSION = 2 + SCHEMA = ApiVersionResponse_v1.SCHEMA + + class ApiVersionRequest_v0(Request): API_KEY = 18 API_VERSION = 0 @@ -43,8 +49,19 @@ class ApiVersionRequest_v1(Request): SCHEMA = ApiVersionRequest_v0.SCHEMA -ApiVersionRequest = [ApiVersionRequest_v0, ApiVersionRequest_v1] -ApiVersionResponse = [ApiVersionResponse_v0, ApiVersionResponse_v1] +class ApiVersionRequest_v2(Request): + API_KEY = 18 + API_VERSION = 2 + RESPONSE_TYPE = ApiVersionResponse_v1 + SCHEMA = ApiVersionRequest_v0.SCHEMA + + +ApiVersionRequest = [ + ApiVersionRequest_v0, ApiVersionRequest_v1, ApiVersionRequest_v2, +] +ApiVersionResponse = [ + ApiVersionResponse_v0, ApiVersionResponse_v1, ApiVersionResponse_v2, +] class CreateTopicsResponse_v0(Response): @@ -79,6 +96,11 @@ class CreateTopicsResponse_v2(Response): ('error_message', String('utf-8')))) ) +class CreateTopicsResponse_v3(Response): + API_KEY = 19 + API_VERSION = 3 + SCHEMA = CreateTopicsResponse_v2.SCHEMA + class CreateTopicsRequest_v0(Request): API_KEY = 19 @@ -126,11 +148,20 @@ class CreateTopicsRequest_v2(Request): SCHEMA = CreateTopicsRequest_v1.SCHEMA +class CreateTopicsRequest_v3(Request): + API_KEY = 19 + API_VERSION = 3 + RESPONSE_TYPE = CreateTopicsResponse_v3 + SCHEMA = CreateTopicsRequest_v1.SCHEMA + + CreateTopicsRequest = [ - CreateTopicsRequest_v0, CreateTopicsRequest_v1, CreateTopicsRequest_v2 + CreateTopicsRequest_v0, CreateTopicsRequest_v1, + CreateTopicsRequest_v2, CreateTopicsRequest_v3, ] CreateTopicsResponse = [ - CreateTopicsResponse_v0, CreateTopicsResponse_v1, CreateTopicsResponse_v2 + CreateTopicsResponse_v0, CreateTopicsResponse_v1, + CreateTopicsResponse_v2, CreateTopicsResponse_v3, ] @@ -155,6 +186,18 @@ class DeleteTopicsResponse_v1(Response): ) +class DeleteTopicsResponse_v2(Response): + API_KEY = 20 + API_VERSION = 2 + SCHEMA = DeleteTopicsResponse_v1.SCHEMA + + +class DeleteTopicsResponse_v3(Response): + API_KEY = 20 + API_VERSION = 3 + SCHEMA = DeleteTopicsResponse_v1.SCHEMA + + class DeleteTopicsRequest_v0(Request): API_KEY = 20 API_VERSION = 0 @@ -172,8 +215,28 @@ class DeleteTopicsRequest_v1(Request): SCHEMA = DeleteTopicsRequest_v0.SCHEMA -DeleteTopicsRequest = [DeleteTopicsRequest_v0, DeleteTopicsRequest_v1] -DeleteTopicsResponse = [DeleteTopicsResponse_v0, DeleteTopicsResponse_v1] +class DeleteTopicsRequest_v2(Request): + API_KEY = 20 + API_VERSION = 2 + RESPONSE_TYPE = DeleteTopicsResponse_v2 + SCHEMA = DeleteTopicsRequest_v0.SCHEMA + + +class DeleteTopicsRequest_v3(Request): + API_KEY = 20 + API_VERSION = 3 + RESPONSE_TYPE = DeleteTopicsResponse_v3 + SCHEMA = DeleteTopicsRequest_v0.SCHEMA + + +DeleteTopicsRequest = [ + DeleteTopicsRequest_v0, DeleteTopicsRequest_v1, + DeleteTopicsRequest_v2, DeleteTopicsRequest_v3, +] +DeleteTopicsResponse = [ + DeleteTopicsResponse_v0, DeleteTopicsResponse_v1, + DeleteTopicsResponse_v2, DeleteTopicsResponse_v3, +] class ListGroupsResponse_v0(Response): @@ -198,6 +261,11 @@ class ListGroupsResponse_v1(Response): ('protocol_type', String('utf-8')))) ) +class ListGroupsResponse_v2(Response): + API_KEY = 16 + API_VERSION = 2 + SCHEMA = ListGroupsResponse_v1.SCHEMA + class ListGroupsRequest_v0(Request): API_KEY = 16 @@ -212,9 +280,21 @@ class ListGroupsRequest_v1(Request): RESPONSE_TYPE = ListGroupsResponse_v1 SCHEMA = ListGroupsRequest_v0.SCHEMA +class ListGroupsRequest_v2(Request): + API_KEY = 16 + API_VERSION = 1 + RESPONSE_TYPE = ListGroupsResponse_v2 + SCHEMA = ListGroupsRequest_v0.SCHEMA -ListGroupsRequest = [ListGroupsRequest_v0, ListGroupsRequest_v1] -ListGroupsResponse = [ListGroupsResponse_v0, ListGroupsResponse_v1] + +ListGroupsRequest = [ + ListGroupsRequest_v0, ListGroupsRequest_v1, + ListGroupsRequest_v2, +] +ListGroupsResponse = [ + ListGroupsResponse_v0, ListGroupsResponse_v1, + ListGroupsResponse_v2, +] class DescribeGroupsResponse_v0(Response): @@ -256,6 +336,33 @@ class DescribeGroupsResponse_v1(Response): ) +class DescribeGroupsResponse_v2(Response): + API_KEY = 15 + API_VERSION = 2 + SCHEMA = DescribeGroupsResponse_v1.SCHEMA + + +class DescribeGroupsResponse_v3(Response): + API_KEY = 15 + API_VERSION = 3 + SCHEMA = Schema( + ('throttle_time_ms', Int32), + ('groups', Array( + ('error_code', Int16), + ('group', String('utf-8')), + ('state', String('utf-8')), + ('protocol_type', String('utf-8')), + ('protocol', String('utf-8')), + ('members', Array( + ('member_id', String('utf-8')), + ('client_id', String('utf-8')), + ('client_host', String('utf-8')), + ('member_metadata', Bytes), + ('member_assignment', Bytes)))), + ('authorized_operations', Int32)) + ) + + class DescribeGroupsRequest_v0(Request): API_KEY = 15 API_VERSION = 0 @@ -272,8 +379,31 @@ class DescribeGroupsRequest_v1(Request): SCHEMA = DescribeGroupsRequest_v0.SCHEMA -DescribeGroupsRequest = [DescribeGroupsRequest_v0, DescribeGroupsRequest_v1] -DescribeGroupsResponse = [DescribeGroupsResponse_v0, DescribeGroupsResponse_v1] +class DescribeGroupsRequest_v2(Request): + API_KEY = 15 + API_VERSION = 2 + RESPONSE_TYPE = DescribeGroupsResponse_v2 + SCHEMA = DescribeGroupsRequest_v0.SCHEMA + + +class DescribeGroupsRequest_v3(Request): + API_KEY = 15 + API_VERSION = 3 + RESPONSE_TYPE = DescribeGroupsResponse_v2 + SCHEMA = Schema( + ('groups', Array(String('utf-8'))), + ('include_authorized_operations', Boolean) + ) + + +DescribeGroupsRequest = [ + DescribeGroupsRequest_v0, DescribeGroupsRequest_v1, + DescribeGroupsRequest_v2, DescribeGroupsRequest_v3, +] +DescribeGroupsResponse = [ + DescribeGroupsResponse_v0, DescribeGroupsResponse_v1, + DescribeGroupsResponse_v2, DescribeGroupsResponse_v3, +] class SaslHandShakeResponse_v0(Response): @@ -507,6 +637,13 @@ class AlterConfigsResponse_v0(Response): ('resource_name', String('utf-8')))) ) + +class AlterConfigsResponse_v1(Response): + API_KEY = 33 + API_VERSION = 1 + SCHEMA = AlterConfigsResponse_v0.SCHEMA + + class AlterConfigsRequest_v0(Request): API_KEY = 33 API_VERSION = 0 @@ -521,8 +658,14 @@ class AlterConfigsRequest_v0(Request): ('validate_only', Boolean) ) -AlterConfigsRequest = [AlterConfigsRequest_v0] -AlterConfigsResponse = [AlterConfigsResponse_v0] +class AlterConfigsRequest_v1(Request): + API_KEY = 33 + API_VERSION = 1 + RESPONSE_TYPE = AlterConfigsResponse_v1 + SCHEMA = AlterConfigsRequest_v0.SCHEMA + +AlterConfigsRequest = [AlterConfigsRequest_v0, AlterConfigsRequest_v1] +AlterConfigsResponse = [AlterConfigsResponse_v0, AlterConfigsRequest_v1] class DescribeConfigsResponse_v0(Response): @@ -565,6 +708,28 @@ class DescribeConfigsResponse_v1(Response): ('config_source', Int8))))))) ) +class DescribeConfigsResponse_v2(Response): + API_KEY = 32 + API_VERSION = 2 + SCHEMA = Schema( + ('throttle_time_ms', Int32), + ('resources', Array( + ('error_code', Int16), + ('error_message', String('utf-8')), + ('resource_type', Int8), + ('resource_name', String('utf-8')), + ('config_entries', Array( + ('config_names', String('utf-8')), + ('config_value', String('utf-8')), + ('read_only', Boolean), + ('config_source', Int8), + ('is_sensitive', Boolean), + ('config_synonyms', Array( + ('config_name', String('utf-8')), + ('config_value', String('utf-8')), + ('config_source', Int8))))))) + ) + class DescribeConfigsRequest_v0(Request): API_KEY = 32 API_VERSION = 0 @@ -588,10 +753,25 @@ class DescribeConfigsRequest_v1(Request): ('include_synonyms', Boolean) ) -DescribeConfigsRequest = [DescribeConfigsRequest_v0, DescribeConfigsRequest_v1] -DescribeConfigsResponse = [DescribeConfigsResponse_v0, DescribeConfigsResponse_v1] -class SaslAuthenticateResponse_v0(Request): +class DescribeConfigsRequest_v2(Request): + API_KEY = 32 + API_VERSION = 2 + RESPONSE_TYPE = DescribeConfigsResponse_v2 + SCHEMA = DescribeConfigsRequest_v1.SCHEMA + + +DescribeConfigsRequest = [ + DescribeConfigsRequest_v0, DescribeConfigsRequest_v1, + DescribeConfigsRequest_v2, +] +DescribeConfigsResponse = [ + DescribeConfigsResponse_v0, DescribeConfigsResponse_v1, + DescribeConfigsResponse_v2, +] + + +class SaslAuthenticateResponse_v0(Response): API_KEY = 36 API_VERSION = 0 SCHEMA = Schema( @@ -601,6 +781,17 @@ class SaslAuthenticateResponse_v0(Request): ) +class SaslAuthenticateResponse_v1(Response): + API_KEY = 36 + API_VERSION = 1 + SCHEMA = Schema( + ('error_code', Int16), + ('error_message', String('utf-8')), + ('sasl_auth_bytes', Bytes), + ('session_lifetime_ms', Int64) + ) + + class SaslAuthenticateRequest_v0(Request): API_KEY = 36 API_VERSION = 0 @@ -610,8 +801,19 @@ class SaslAuthenticateRequest_v0(Request): ) -SaslAuthenticateRequest = [SaslAuthenticateRequest_v0] -SaslAuthenticateResponse = [SaslAuthenticateResponse_v0] +class SaslAuthenticateRequest_v1(Request): + API_KEY = 36 + API_VERSION = 1 + RESPONSE_TYPE = SaslAuthenticateResponse_v1 + SCHEMA = SaslAuthenticateRequest_v0.SCHEMA + + +SaslAuthenticateRequest = [ + SaslAuthenticateRequest_v0, SaslAuthenticateRequest_v1, +] +SaslAuthenticateResponse = [ + SaslAuthenticateResponse_v0, SaslAuthenticateResponse_v1, +] class CreatePartitionsResponse_v0(Response): @@ -626,6 +828,12 @@ class CreatePartitionsResponse_v0(Response): ) +class CreatePartitionsResponse_v1(Response): + API_KEY = 37 + API_VERSION = 1 + SCHEMA = CreatePartitionsResponse_v0.SCHEMA + + class CreatePartitionsRequest_v0(Request): API_KEY = 37 API_VERSION = 0 @@ -641,5 +849,16 @@ class CreatePartitionsRequest_v0(Request): ) -CreatePartitionsRequest = [CreatePartitionsRequest_v0] -CreatePartitionsResponse = [CreatePartitionsResponse_v0] +class CreatePartitionsRequest_v1(Request): + API_KEY = 37 + API_VERSION = 1 + SCHEMA = CreatePartitionsRequest_v0.SCHEMA + RESPONSE_TYPE = CreatePartitionsResponse_v1 + + +CreatePartitionsRequest = [ + CreatePartitionsRequest_v0, CreatePartitionsRequest_v1, +] +CreatePartitionsResponse = [ + CreatePartitionsResponse_v0, CreatePartitionsResponse_v1, +] diff --git a/test/test_api_object_implementation.py b/test/test_api_object_implementation.py new file mode 100644 index 000000000..da80f148c --- /dev/null +++ b/test/test_api_object_implementation.py @@ -0,0 +1,18 @@ +import abc +import pytest + +from kafka.protocol.api import Request +from kafka.protocol.api import Response + + +attr_names = [n for n in dir(Request) if isinstance(getattr(Request, n), abc.abstractproperty)] +@pytest.mark.parametrize('klass', Request.__subclasses__()) +@pytest.mark.parametrize('attr_name', attr_names) +def test_request_type_conformance(klass, attr_name): + assert hasattr(klass, attr_name) + +attr_names = [n for n in dir(Response) if isinstance(getattr(Response, n), abc.abstractproperty)] +@pytest.mark.parametrize('klass', Response.__subclasses__()) +@pytest.mark.parametrize('attr_name', attr_names) +def test_response_type_conformance(klass, attr_name): + assert hasattr(klass, attr_name)