Skip to content

Commit

Permalink
python 5.0.3 (#937)
Browse files Browse the repository at this point in the history
* update to 5.0.3
  • Loading branch information
zhouli11 authored Feb 11, 2025
1 parent 6501701 commit 0425196
Show file tree
Hide file tree
Showing 25 changed files with 1,113 additions and 460 deletions.
44 changes: 36 additions & 8 deletions python/rocketmq/v5/client/balancer/queue_selector.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,18 +33,36 @@ def __init__(self, queues, selector_type=NONE_TYPE_SELECTOR):

@classmethod
def producer_queue_selector(cls, topic_route: TopicRouteData):
return cls(list(filter(lambda queue: queue.is_writable() and queue.is_master_broker(), topic_route.message_queues)),
QueueSelector.PRODUCER_QUEUE_SELECTOR)
return cls(
list(
filter(
lambda queue: queue.is_writable() and queue.is_master_broker(),
topic_route.message_queues,
)
),
QueueSelector.PRODUCER_QUEUE_SELECTOR,
)

@classmethod
def simple_consumer_queue_selector(cls, topic_route: TopicRouteData):
return cls(list(filter(lambda queue: queue.is_readable() and queue.is_master_broker(), topic_route.message_queues)),
QueueSelector.SIMPLE_CONSUMER_QUEUE_SELECTOR)
return cls(
list(
filter(
lambda queue: queue.is_readable() and queue.is_master_broker(),
topic_route.message_queues,
)
),
QueueSelector.SIMPLE_CONSUMER_QUEUE_SELECTOR,
)

def select_next_queue(self):
if self.__selector_type == QueueSelector.NONE_TYPE_SELECTOR:
raise IllegalArgumentException("error type for queue selector, type is NONE_TYPE_SELECTOR.")
return self.__message_queues[self.__index.get_and_increment() % len(self.__message_queues)]
raise IllegalArgumentException(
"error type for queue selector, type is NONE_TYPE_SELECTOR."
)
return self.__message_queues[
self.__index.get_and_increment() % len(self.__message_queues)
]

def all_queues(self):
index = self.__index.get_and_increment() % len(self.__message_queues)
Expand All @@ -54,6 +72,16 @@ def update(self, topic_route: TopicRouteData):
if topic_route.message_queues == self.__message_queues:
return
if self.__selector_type == QueueSelector.PRODUCER_QUEUE_SELECTOR:
self.__message_queues = list(filter(lambda queue: queue.is_writable() and queue.is_master_broker(), topic_route.message_queues))
self.__message_queues = list(
filter(
lambda queue: queue.is_writable() and queue.is_master_broker(),
topic_route.message_queues,
)
)
elif self.__selector_type == QueueSelector.SIMPLE_CONSUMER_QUEUE_SELECTOR:
self.__message_queues = list(filter(lambda queue: queue.is_readable() and queue.is_master_broker(), topic_route.message_queues))
self.__message_queues = list(
filter(
lambda queue: queue.is_readable() and queue.is_master_broker(),
topic_route.message_queues,
)
)
235 changes: 158 additions & 77 deletions python/rocketmq/v5/client/client.py

Large diffs are not rendered by default.

17 changes: 13 additions & 4 deletions python/rocketmq/v5/client/client_configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,12 @@ def sk(self):

class ClientConfiguration:

def __init__(self, endpoints: str, credentials: Credentials, namespace="", request_timeout=3):
self.__rpc_endpoints = RpcEndpoints(ClientConfiguration.__parse_endpoints(endpoints))
def __init__(
self, endpoints: str, credentials: Credentials, namespace="", request_timeout=3
):
self.__rpc_endpoints = RpcEndpoints(
ClientConfiguration.__parse_endpoints(endpoints)
)
self.__credentials = credentials
self.__request_timeout = request_timeout # seconds
self.__namespace = namespace
Expand All @@ -52,7 +56,10 @@ def __parse_endpoints(endpoints_str):
endpoints = Endpoints()
addresses = endpoints_str.split(";")
endpoints.scheme = ClientConfiguration.__parse_endpoints_scheme_type(
ClientConfiguration.__parse_endpoints_prefix(addresses[0].split(":")[0]))
ClientConfiguration.__parse_endpoints_prefix(
addresses[0].split(":")[0]
)
)
for address in addresses:
if len(address) == 0:
continue
Expand All @@ -62,7 +69,9 @@ def __parse_endpoints(endpoints_str):
ad.port = int(address.split(":")[1])
return endpoints
except Exception as e:
logger.error(f"client configuration parse {endpoints_str} exception: {e}")
logger.error(
f"client configuration parse {endpoints_str} exception: {e}"
)
return None

@staticmethod
Expand Down
79 changes: 57 additions & 22 deletions python/rocketmq/v5/client/connection/rpc_channel.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,13 @@ class RpcEndpoints:
def __init__(self, endpoints: Endpoints):
self.__endpoints = endpoints
self.__scheme = endpoints.scheme
self.__addresses = set(map(lambda address: RpcAddress(address), endpoints.addresses))
self.__addresses = set(
map(lambda address: RpcAddress(address), endpoints.addresses)
)
if self.__scheme == AddressScheme.DOMAIN_NAME and len(self.__addresses) > 1:
raise UnsupportedException("Multiple addresses not allowed in domain schema")
raise UnsupportedException(
"Multiple addresses not allowed in domain schema"
)
self.__facade, self.__endpoint_desc = self.__facade()

def __hash__(self) -> int:
Expand All @@ -79,8 +83,11 @@ def __str__(self):
""" private """

def __facade(self):
if self.__scheme is None or len(
self.__addresses) == 0 or self.__scheme == AddressScheme.ADDRESS_SCHEME_UNSPECIFIED:
if (
self.__scheme is None
or len(self.__addresses) == 0
or self.__scheme == AddressScheme.ADDRESS_SCHEME_UNSPECIFIED
):
return ""

prefix = "dns:"
Expand All @@ -94,7 +101,7 @@ def __facade(self):
ret = ""
for address in sorted_list:
ret = ret + address.__str__() + ","
return prefix + ret[0: len(ret) - 1], ret[0: len(ret) - 1]
return prefix + ret[0:len(ret) - 1], ret[0:len(ret) - 1]

""" property """

Expand Down Expand Up @@ -123,22 +130,33 @@ async def start_stream_read(self):
if res.HasField("settings"):
# read a response for send setting result
if res is not None and res.status.code == Code.OK:
logger.debug(f"async setting success. response status code: {res.status.code}")
if res.settings is not None and res.settings.metric is not None:
logger.debug(
f"{ self.__handler.__str__()} sync setting success. response status code: {res.status.code}"
)
if (
res.settings is not None
and res.settings.metric is not None
):
# reset metrics if needed
self.__handler.reset_metric(res.settings.metric)
elif res.HasField("recover_orphaned_transaction_command"):
# sever check for a transaction message
if self.__handler is not None:
transaction_id = res.recover_orphaned_transaction_command.transaction_id
transaction_id = (
res.recover_orphaned_transaction_command.transaction_id
)
message = res.recover_orphaned_transaction_command.message
await self.__handler.on_recover_orphaned_transaction_command(self.__endpoints, message,
transaction_id)
await self.__handler.on_recover_orphaned_transaction_command(
self.__endpoints, message, transaction_id
)
except AioRpcError as e:
logger.warn(
f"stream read from endpoints {self.__endpoints.__str__()} occurred AioRpcError. code: {e.code()}, message: {e.details()}")
f"{ self.__handler.__str__()} read stream from endpoints {self.__endpoints.__str__()} occurred AioRpcError. code: {e.code()}, message: {e.details()}"
)
except Exception as e:
logger.error(f"stream read from endpoints {self.__endpoints.__str__()} exception, {e}")
logger.error(
f"{ self.__handler.__str__()} read stream from endpoints {self.__endpoints.__str__()} exception, {e}"
)

async def stream_write(self, req):
if self.__stream_stream_call is not None:
Expand All @@ -164,6 +182,7 @@ def __init__(self, endpoints: RpcEndpoints, tls_enabled=False):

def create_channel(self, loop):
# create grpc channel with the given loop
# assert loop == RpcClient._io_loop
asyncio.set_event_loop(loop)
self.__create_aio_channel()

Expand All @@ -173,7 +192,9 @@ def close_channel(self, loop):
if self.__telemetry_stream_stream_call is not None:
self.__telemetry_stream_stream_call.close()
self.__telemetry_stream_stream_call = None
logger.info(f"channel[{self.__endpoints.__str__()}] close stream_stream_call success.")
logger.info(
f"channel[{self.__endpoints.__str__()}] close stream_stream_call success."
)
if self.channel_state() is not ChannelConnectivity.SHUTDOWN:
# close grpc channel
asyncio.run_coroutine_threadsafe(self.__async_channel.close(), loop)
Expand All @@ -189,29 +210,43 @@ def channel_state(self, wait_for_ready=True):
def register_telemetry_stream_stream_call(self, stream_stream_call, handler):
if self.__telemetry_stream_stream_call is not None:
self.__telemetry_stream_stream_call.close()
self.__telemetry_stream_stream_call = RpcStreamStreamCall(self.__endpoints, stream_stream_call, handler)
self.__telemetry_stream_stream_call = RpcStreamStreamCall(
self.__endpoints, stream_stream_call, handler
)

""" private """

def __create_aio_channel(self):
try:
if self.__endpoints is None:
raise IllegalArgumentException("create_aio_channel exception, endpoints is None")
raise IllegalArgumentException(
"create_aio_channel exception, endpoints is None"
)
else:
options = [('grpc.enable_retries', 0), ("grpc.max_send_message_length", -1),
("grpc.max_receive_message_length", -1)]
options = [
("grpc.enable_retries", 0),
("grpc.max_send_message_length", -1),
("grpc.max_receive_message_length", -1),
]
if self.__tls_enabled:
self.__async_channel = aio.secure_channel(self.__endpoints.facade, grpc.ssl_channel_credentials(),
options)
self.__async_channel = aio.secure_channel(
self.__endpoints.facade, grpc.ssl_channel_credentials(), options
)
else:
self.__async_channel = aio.insecure_channel(self.__endpoints.facade, options)
self.__async_channel = aio.insecure_channel(
self.__endpoints.facade, options
)
self.__async_stub = MessagingServiceStub(self.__async_channel)
logger.debug(
f"create_aio_channel to [{self.__endpoints.__str__()}] success. channel state:{self.__async_channel.get_state()}")
f"create_aio_channel to [{self.__endpoints.__str__()}] success. channel state:{self.__async_channel.get_state()}"
)
except Exception as e:
logger.error(f"create_aio_channel to [{self.__endpoints.__str__()}] exception: {e}")
logger.error(
f"create_aio_channel to [{self.__endpoints.__str__()}] exception: {e}"
)
raise e

#
""" property """

@property
Expand Down
Loading

0 comments on commit 0425196

Please sign in to comment.