diff --git a/sdk/servicebus/azure-servicebus/CHANGELOG.md b/sdk/servicebus/azure-servicebus/CHANGELOG.md index 604fab5e43d..0b908ed0a30 100644 --- a/sdk/servicebus/azure-servicebus/CHANGELOG.md +++ b/sdk/servicebus/azure-servicebus/CHANGELOG.md @@ -2,10 +2,23 @@ ## 7.0.0b8 (Unreleased) +**New Features** + +* Added support for `timeout` parameter on the following operations: + - `ServiceBusSender`: `send_messages`, `schedule_messages` and `cancel_scheduled_messages` + - `ServiceBusReceiver`: `receive_deferred_messages` and `peek_messages` + - `ServiceBusSession`: `get_state`, `set_state` and `renew_lock` + - `ReceivedMessage`: `renew_lock` + +**BugFixes** + +* Updated uAMQP dependency to 1.2.11. + - Fixed bug where amqp message `footer` and `delivery_annotation` were not encoded into the outgoing payload. ## 7.0.0b7 (2020-10-05) **Breaking Changes** + * Passing any type other than `ReceiveMode` as parameter `receive_mode` now throws a `TypeError` instead of `AttributeError`. * Administration Client calls now take only entity names, not `Descriptions` as well to reduce ambiguity in which entity was being acted on. TypeError will now be thrown on improper parameter types (non-string). * `AMQPMessage` (`Message.amqp_message`) properties are now read-only, changes of these properties would not be reflected in the underlying message. This may be subject to change before GA. diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/_base_handler.py b/sdk/servicebus/azure-servicebus/azure/servicebus/_base_handler.py index 545fa074807..7bf24816b02 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/_base_handler.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/_base_handler.py @@ -15,7 +15,7 @@ from urllib.parse import quote_plus import uamqp -from uamqp import utils +from uamqp import utils, compat from uamqp.message import MessageProperties from azure.core.credentials import AccessToken @@ -24,6 +24,7 @@ from .exceptions import ( ServiceBusError, ServiceBusAuthenticationError, + OperationTimeoutError, _create_servicebus_exception ) from ._common.utils import create_properties @@ -233,14 +234,14 @@ def _backoff( self, retried_times, last_exception, - timeout=None, + abs_timeout_time=None, entity_name=None ): # type: (int, Exception, Optional[float], str) -> None entity_name = entity_name or self._container_id backoff = self._config.retry_backoff_factor * 2 ** retried_times if backoff <= self._config.retry_backoff_max and ( - timeout is None or backoff <= timeout + abs_timeout_time is None or (backoff + time.time()) <= abs_timeout_time ): # pylint:disable=no-else-return time.sleep(backoff) _LOGGER.info( @@ -259,14 +260,17 @@ def _backoff( def _do_retryable_operation(self, operation, timeout=None, **kwargs): # type: (Callable, Optional[float], Any) -> Any require_last_exception = kwargs.pop("require_last_exception", False) - require_timeout = kwargs.pop("require_timeout", False) + operation_requires_timeout = kwargs.pop("operation_requires_timeout", False) retried_times = 0 max_retries = self._config.retry_total + abs_timeout_time = (time.time() + timeout) if (operation_requires_timeout and timeout) else None + while retried_times <= max_retries: try: - if require_timeout: - kwargs["timeout"] = timeout + if operation_requires_timeout and abs_timeout_time: + remaining_timeout = abs_timeout_time - time.time() + kwargs["timeout"] = remaining_timeout return operation(**kwargs) except StopIteration: raise @@ -285,13 +289,37 @@ def _do_retryable_operation(self, operation, timeout=None, **kwargs): self._backoff( retried_times=retried_times, last_exception=last_exception, - timeout=timeout + abs_timeout_time=abs_timeout_time ) - def _mgmt_request_response(self, mgmt_operation, message, callback, keep_alive_associated_link=True, **kwargs): - # type: (str, uamqp.Message, Callable, bool, Any) -> uamqp.Message + def _mgmt_request_response( + self, + mgmt_operation, + message, + callback, + keep_alive_associated_link=True, + timeout=None, + **kwargs + ): + # type: (bytes, uamqp.Message, Callable, bool, Optional[float], Any) -> uamqp.Message + """ + Execute an amqp management operation. + + :param bytes mgmt_operation: The type of operation to be performed. This value will + be service-specific, but common values include READ, CREATE and UPDATE. + This value will be added as an application property on the message. + :param message: The message to send in the management request. + :paramtype message: ~uamqp.message.Message + :param callback: The callback which is used to parse the returning message. + :paramtype callback: Callable[int, ~uamqp.message.Message, str] + :param keep_alive_associated_link: A boolean flag for keeping associated amqp sender/receiver link alive when + executing operation on mgmt links. + :param timeout: timeout in seconds executing the mgmt operation. + :rtype: None + """ self._open() application_properties = {} + # Some mgmt calls do not support an associated link name (such as list_sessions). Most do, so on by default. if keep_alive_associated_link: try: @@ -314,19 +342,23 @@ def _mgmt_request_response(self, mgmt_operation, message, callback, keep_alive_a mgmt_operation, op_type=MGMT_REQUEST_OP_TYPE_ENTITY_MGMT, node=self._mgmt_target.encode(self._config.encoding), - timeout=5000, + timeout=timeout * 1000 if timeout else None, callback=callback ) except Exception as exp: # pylint: disable=broad-except + if isinstance(exp, compat.TimeoutException): + raise OperationTimeoutError("Management operation timed out.", inner_exception=exp) raise ServiceBusError("Management request failed: {}".format(exp), exp) - def _mgmt_request_response_with_retry(self, mgmt_operation, message, callback, **kwargs): - # type: (bytes, Dict[str, Any], Callable, Any) -> Any + def _mgmt_request_response_with_retry(self, mgmt_operation, message, callback, timeout=None, **kwargs): + # type: (bytes, Dict[str, Any], Callable, Optional[float], Any) -> Any return self._do_retryable_operation( self._mgmt_request_response, mgmt_operation=mgmt_operation, message=message, callback=callback, + timeout=timeout, + operation_requires_timeout=True, **kwargs ) diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/_common/_configuration.py b/sdk/servicebus/azure-servicebus/azure/servicebus/_common/_configuration.py index 996768b4bcd..05a9c960aac 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/_common/_configuration.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/_common/_configuration.py @@ -20,7 +20,9 @@ def __init__(self, **kwargs): if self.http_proxy else kwargs.get("transport_type", TransportType.Amqp) ) + # The following configs are not public, for internal usage only self.auth_timeout = kwargs.get("auth_timeout", 60) # type: int self.encoding = kwargs.get("encoding", "UTF-8") self.auto_reconnect = kwargs.get("auto_reconnect", True) self.keep_alive = kwargs.get("keep_alive", 30) + self.timeout = kwargs.get("timeout", 60) # type: float diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/_common/message.py b/sdk/servicebus/azure-servicebus/azure/servicebus/_common/message.py index 4b1b6eee331..8d3de77eecc 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/_common/message.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/_common/message.py @@ -618,7 +618,6 @@ def _to_outgoing_message(self): via_partition_key=self.via_partition_key ) - @property def dead_letter_error_description(self): # type: () -> Optional[str] @@ -1043,8 +1042,8 @@ def defer(self): self._settle_message(MESSAGE_DEFER) self._settled = True - def renew_lock(self): - # type: () -> datetime.datetime + def renew_lock(self, **kwargs): + # type: (Any) -> datetime.datetime # pylint: disable=protected-access,no-member """Renew the message lock. @@ -1060,6 +1059,8 @@ def renew_lock(self): Lock renewal can be performed as a background task by registering the message with an `azure.servicebus.AutoLockRenew` instance. + :keyword float timeout: The total operation timeout in seconds including all the retries. The value must be + greater than 0 if specified. The default value is None, meaning no timeout. :returns: The utc datetime the lock is set to expire at. :rtype: datetime.datetime :raises: TypeError if the message is sessionful. @@ -1067,7 +1068,7 @@ def renew_lock(self): :raises: ~azure.servicebus.exceptions.MessageAlreadySettled is message has already been settled. """ try: - if self._receiver.session: # type: ignore + if self._receiver.session: # type: ignore raise TypeError("Session messages cannot be renewed. Please renew the Session lock instead.") except AttributeError: pass @@ -1076,8 +1077,12 @@ def renew_lock(self): if not token: raise ValueError("Unable to renew lock - no lock token found.") - expiry = self._receiver._renew_locks(token) # type: ignore - self._expiry = utc_from_timestamp(expiry[MGMT_RESPONSE_MESSAGE_EXPIRATION][0]/1000.0) # type: datetime.datetime + timeout = kwargs.pop("timeout", None) + if timeout is not None and timeout <= 0: + raise ValueError("The timeout must be greater than 0.") + + expiry = self._receiver._renew_locks(token, timeout=timeout) # type: ignore + self._expiry = utc_from_timestamp(expiry[MGMT_RESPONSE_MESSAGE_EXPIRATION][0]/1000.0) # type: datetime.datetime return self._expiry diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_receiver.py b/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_receiver.py index 9341fbc87ba..5d7d92cd147 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_receiver.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_receiver.py @@ -210,11 +210,6 @@ def _open(self): self.close() raise - def close(self): - # type: () -> None - super(ServiceBusReceiver, self).close() - self._message_iter = None # pylint: disable=attribute-defined-outside-init - def _receive(self, max_message_count=None, timeout=None): # type: (Optional[int], Optional[float]) -> List[ReceivedMessage] # pylint: disable=protected-access @@ -276,15 +271,22 @@ def _settle_message(self, settlement, lock_tokens, dead_letter_details=None): mgmt_handlers.default ) - def _renew_locks(self, *lock_tokens): - # type: (str) -> Any + def _renew_locks(self, *lock_tokens, **kwargs): + # type: (str, Any) -> Any + timeout = kwargs.pop("timeout", None) message = {MGMT_REQUEST_LOCK_TOKENS: types.AMQPArray(lock_tokens)} return self._mgmt_request_response_with_retry( REQUEST_RESPONSE_RENEWLOCK_OPERATION, message, - mgmt_handlers.lock_renew_op + mgmt_handlers.lock_renew_op, + timeout=timeout ) + def close(self): + # type: () -> None + super(ServiceBusReceiver, self).close() + self._message_iter = None # pylint: disable=attribute-defined-outside-init + def get_streaming_message_iter(self, max_wait_time=None): # type: (float) -> Iterator[ReceivedMessage] """Receive messages from an iterator indefinitely, or if a max_wait_time is specified, until @@ -413,11 +415,11 @@ def receive_messages(self, max_message_count=None, max_wait_time=None): self._receive, max_message_count=max_message_count, timeout=max_wait_time, - require_timeout=True + operation_requires_timeout=True ) - def receive_deferred_messages(self, sequence_numbers): - # type: (Union[int,List[int]]) -> List[ReceivedMessage] + def receive_deferred_messages(self, sequence_numbers, **kwargs): + # type: (Union[int,List[int]], Any) -> List[ReceivedMessage] """Receive messages that have previously been deferred. When receiving deferred messages from a partitioned entity, all of the supplied @@ -425,6 +427,8 @@ def receive_deferred_messages(self, sequence_numbers): :param Union[int,List[int]] sequence_numbers: A list of the sequence numbers of messages that have been deferred. + :keyword float timeout: The total operation timeout in seconds including all the retries. The value must be + greater than 0 if specified. The default value is None, meaning no timeout. :rtype: List[~azure.servicebus.ReceivedMessage] .. admonition:: Example: @@ -438,6 +442,9 @@ def receive_deferred_messages(self, sequence_numbers): """ self._check_live() + timeout = kwargs.pop("timeout", None) + if timeout is not None and timeout <= 0: + raise ValueError("The timeout must be greater than 0.") if isinstance(sequence_numbers, six.integer_types): sequence_numbers = [sequence_numbers] if not sequence_numbers: @@ -458,12 +465,13 @@ def receive_deferred_messages(self, sequence_numbers): messages = self._mgmt_request_response_with_retry( REQUEST_RESPONSE_RECEIVE_BY_SEQUENCE_NUMBER, message, - handler + handler, + timeout=timeout ) return messages - def peek_messages(self, max_message_count=1, sequence_number=None): - # type: (int, Optional[int]) -> List[PeekedMessage] + def peek_messages(self, max_message_count=1, **kwargs): + # type: (int, Any) -> List[PeekedMessage] """Browse messages currently pending in the queue. Peeked messages are not removed from queue, nor are they locked. They cannot be completed, @@ -471,7 +479,9 @@ def peek_messages(self, max_message_count=1, sequence_number=None): :param int max_message_count: The maximum number of messages to try and peek. The default value is 1. - :param int sequence_number: A message sequence number from which to start browsing messages. + :keyword int sequence_number: A message sequence number from which to start browsing messages. + :keyword float timeout: The total operation timeout in seconds including all the retries. The value must be + greater than 0 if specified. The default value is None, meaning no timeout. :rtype: List[~azure.servicebus.PeekedMessage] @@ -486,6 +496,10 @@ def peek_messages(self, max_message_count=1, sequence_number=None): """ self._check_live() + sequence_number = kwargs.pop("sequence_number", 0) + timeout = kwargs.pop("timeout", None) + if timeout is not None and timeout <= 0: + raise ValueError("The timeout must be greater than 0.") if not sequence_number: sequence_number = self._last_received_sequenced_number or 1 if int(max_message_count) < 1: @@ -504,5 +518,6 @@ def peek_messages(self, max_message_count=1, sequence_number=None): return self._mgmt_request_response_with_retry( REQUEST_RESPONSE_PEEK_OPERATION, message, - mgmt_handlers.peek_op + mgmt_handlers.peek_op, + timeout=timeout ) diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_sender.py b/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_sender.py index d664f1165ae..a7eb179d009 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_sender.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_sender.py @@ -48,18 +48,18 @@ def _create_attribute(self): self.entity_name = self._entity_name def _set_msg_timeout(self, timeout=None, last_exception=None): + # pylint: disable=protected-access if not timeout: + self._handler._msg_timeout = 0 return - timeout_time = time.time() + timeout - remaining_time = timeout_time - time.time() - if remaining_time <= 0.0: + if timeout <= 0.0: if last_exception: error = last_exception else: error = OperationTimeoutError("Send operation timed out") _LOGGER.info("%r send operation timed out. (%r)", self._name, error) raise error - self._handler._msg_timeout = remaining_time * 1000 # type: ignore # pylint: disable=protected-access + self._handler._msg_timeout = timeout * 1000 # type: ignore @classmethod def _build_schedule_request(cls, schedule_time_utc, *messages): @@ -188,17 +188,23 @@ def _open(self): def _send(self, message, timeout=None, last_exception=None): # type: (Message, Optional[float], Exception) -> None self._open() - self._set_msg_timeout(timeout, last_exception) - self._handler.send_message(message.message) + default_timeout = self._handler._msg_timeout # pylint: disable=protected-access + try: + self._set_msg_timeout(timeout, last_exception) + self._handler.send_message(message.message) + finally: # reset the timeout of the handler back to the default value + self._set_msg_timeout(default_timeout, None) - def schedule_messages(self, messages, schedule_time_utc): - # type: (Union[Message, List[Message]], datetime.datetime) -> List[int] + def schedule_messages(self, messages, schedule_time_utc, **kwargs): + # type: (Union[Message, List[Message]], datetime.datetime, Any) -> List[int] """Send Message or multiple Messages to be enqueued at a specific time. Returns a list of the sequence numbers of the enqueued messages. :param messages: The message or list of messages to schedule. :type messages: Union[~azure.servicebus.Message, List[~azure.servicebus.Message]] :param schedule_time_utc: The utc date and time to enqueue the messages. :type schedule_time_utc: ~datetime.datetime + :keyword float timeout: The total operation timeout in seconds including all the retries. The value must be + greater than 0 if specified. The default value is None, meaning no timeout. :rtype: List[int] .. admonition:: Example: @@ -212,6 +218,9 @@ def schedule_messages(self, messages, schedule_time_utc): """ # pylint: disable=protected-access self._open() + timeout = kwargs.pop("timeout", None) + if timeout is not None and timeout <= 0: + raise ValueError("The timeout must be greater than 0.") if isinstance(messages, Message): request_body = self._build_schedule_request(schedule_time_utc, messages) else: @@ -219,16 +228,19 @@ def schedule_messages(self, messages, schedule_time_utc): return self._mgmt_request_response_with_retry( REQUEST_RESPONSE_SCHEDULE_MESSAGE_OPERATION, request_body, - mgmt_handlers.schedule_op + mgmt_handlers.schedule_op, + timeout=timeout ) - def cancel_scheduled_messages(self, sequence_numbers): - # type: (Union[int, List[int]]) -> None + def cancel_scheduled_messages(self, sequence_numbers, **kwargs): + # type: (Union[int, List[int]], Any) -> None """ Cancel one or more messages that have previously been scheduled and are still pending. :param sequence_numbers: The sequence numbers of the scheduled messages. :type sequence_numbers: int or list[int] + :keyword float timeout: The total operation timeout in seconds including all the retries. The value must be + greater than 0 if specified. The default value is None, meaning no timeout. :rtype: None :raises: ~azure.servicebus.exceptions.ServiceBusError if messages cancellation failed due to message already cancelled or enqueued. @@ -243,6 +255,9 @@ def cancel_scheduled_messages(self, sequence_numbers): :caption: Cancelling messages scheduled to be sent in future """ self._open() + timeout = kwargs.pop("timeout", None) + if timeout is not None and timeout <= 0: + raise ValueError("The timeout must be greater than 0.") if isinstance(sequence_numbers, int): numbers = [types.AMQPLong(sequence_numbers)] else: @@ -251,7 +266,8 @@ def cancel_scheduled_messages(self, sequence_numbers): return self._mgmt_request_response_with_retry( REQUEST_RESPONSE_CANCEL_SCHEDULED_MESSAGE_OPERATION, request_body, - mgmt_handlers.default + mgmt_handlers.default, + timeout=timeout ) @classmethod @@ -299,8 +315,8 @@ def from_connection_string( ) return cls(**constructor_args) - def send_messages(self, message): - # type: (Union[Message, BatchMessage, List[Message]]) -> None + def send_messages(self, message, **kwargs): + # type: (Union[Message, BatchMessage, List[Message]], Any) -> None """Sends message and blocks until acknowledgement is received or operation times out. If a list of messages was provided, attempts to send them as a single batch, throwing a @@ -308,6 +324,8 @@ def send_messages(self, message): :param message: The ServiceBus message to be sent. :type message: ~azure.servicebus.Message or ~azure.servicebus.BatchMessage or list[~azure.servicebus.Message] + :keyword float timeout: The total operation timeout in seconds including all the retries. The value must be + greater than 0 if specified. The default value is None, meaning no timeout. :rtype: None :raises: :class: ~azure.servicebus.exceptions.OperationTimeoutError if sending times out. @@ -328,6 +346,9 @@ def send_messages(self, message): :caption: Send message. """ + timeout = kwargs.pop("timeout", None) + if timeout is not None and timeout <= 0: + raise ValueError("The timeout must be greater than 0.") message = transform_messages_to_sendable_if_needed(message) try: batch = self.create_batch() @@ -343,7 +364,8 @@ def send_messages(self, message): self._do_retryable_operation( self._send, message=message, - require_timeout=True, + timeout=timeout, + operation_requires_timeout=True, require_last_exception=True ) diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_session.py b/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_session.py index a5387a29341..5ed76d9d3cb 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_session.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_session.py @@ -4,7 +4,7 @@ # -------------------------------------------------------------------------------------------- import logging import datetime -from typing import TYPE_CHECKING, Union, Optional +from typing import TYPE_CHECKING, Union, Optional, Any import six from ._common.utils import utc_from_timestamp, utc_now @@ -90,12 +90,15 @@ class ServiceBusSession(BaseSession): :caption: Get session from a receiver """ - def get_state(self): - # type: () -> str + def get_state(self, **kwargs): + # type: (Any) -> str + # pylint: disable=protected-access """Get the session state. Returns None if no state has been set. + :keyword float timeout: The total operation timeout in seconds including all the retries. The value must be + greater than 0 if specified. The default value is None, meaning no timeout. :rtype: str .. admonition:: Example: @@ -108,22 +111,29 @@ def get_state(self): :caption: Get the session state """ self._check_live() - response = self._receiver._mgmt_request_response_with_retry( # pylint: disable=protected-access + timeout = kwargs.pop("timeout", None) + if timeout is not None and timeout <= 0: + raise ValueError("The timeout must be greater than 0.") + response = self._receiver._mgmt_request_response_with_retry( REQUEST_RESPONSE_GET_SESSION_STATE_OPERATION, {MGMT_REQUEST_SESSION_ID: self._session_id}, - mgmt_handlers.default + mgmt_handlers.default, + timeout=timeout ) - session_state = response.get(MGMT_RESPONSE_SESSION_STATE) + session_state = response.get(MGMT_RESPONSE_SESSION_STATE) # type: ignore if isinstance(session_state, six.binary_type): session_state = session_state.decode(self._encoding) return session_state - def set_state(self, state): - # type: (Union[str, bytes, bytearray]) -> None + def set_state(self, state, **kwargs): + # type: (Union[str, bytes, bytearray], Any) -> None + # pylint: disable=protected-access """Set the session state. :param state: The state value. :type state: Union[str, bytes, bytearray] + :keyword float timeout: The total operation timeout in seconds including all the retries. The value must be + greater than 0 if specified. The default value is None, meaning no timeout. .. admonition:: Example: @@ -135,15 +145,20 @@ def set_state(self, state): :caption: Set the session state """ self._check_live() + timeout = kwargs.pop("timeout", None) + if timeout is not None and timeout <= 0: + raise ValueError("The timeout must be greater than 0.") state = state.encode(self._encoding) if isinstance(state, six.text_type) else state - return self._receiver._mgmt_request_response_with_retry( # pylint: disable=protected-access + return self._receiver._mgmt_request_response_with_retry( # type: ignore REQUEST_RESPONSE_SET_SESSION_STATE_OPERATION, {MGMT_REQUEST_SESSION_ID: self._session_id, MGMT_REQUEST_SESSION_STATE: bytearray(state)}, - mgmt_handlers.default + mgmt_handlers.default, + timeout=timeout ) - def renew_lock(self): - # type: () -> datetime.datetime + def renew_lock(self, **kwargs): + # type: (Any) -> datetime.datetime + # pylint: disable=protected-access """Renew the session lock. This operation must be performed periodically in order to retain a lock on the @@ -154,6 +169,8 @@ def renew_lock(self): This operation can also be performed as a threaded background task by registering the session with an `azure.servicebus.AutoLockRenew` instance. + :keyword float timeout: The total operation timeout in seconds including all the retries. The value must be + greater than 0 if specified. The default value is None, meaning no timeout. :returns: The utc datetime the lock is set to expire at. :rtype: datetime.datetime @@ -167,12 +184,16 @@ def renew_lock(self): :caption: Renew the session lock before it expires """ self._check_live() - expiry = self._receiver._mgmt_request_response_with_retry( # pylint: disable=protected-access + timeout = kwargs.pop("timeout", None) + if timeout is not None and timeout <= 0: + raise ValueError("The timeout must be greater than 0.") + expiry = self._receiver._mgmt_request_response_with_retry( REQUEST_RESPONSE_RENEW_SESSION_LOCK_OPERATION, {MGMT_REQUEST_SESSION_ID: self._session_id}, - mgmt_handlers.default + mgmt_handlers.default, + timeout=timeout ) - expiry_timestamp = expiry[MGMT_RESPONSE_RECEIVER_EXPIRATION]/1000.0 - self._locked_until_utc = utc_from_timestamp(expiry_timestamp) # type: datetime.datetime + expiry_timestamp = expiry[MGMT_RESPONSE_RECEIVER_EXPIRATION]/1000.0 # type: ignore + self._locked_until_utc = utc_from_timestamp(expiry_timestamp) # type: datetime.datetime return self._locked_until_utc diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_session_receiver.py b/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_session_receiver.py index 51381600da6..1799c5059e1 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_session_receiver.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_session_receiver.py @@ -10,11 +10,11 @@ from ._servicebus_session import ServiceBusSession if TYPE_CHECKING: - import datetime from azure.core.credentials import TokenCredential _LOGGER = logging.getLogger(__name__) + class ServiceBusSessionReceiver(ServiceBusReceiver, SessionReceiverMixin): """The ServiceBusSessionReceiver class defines a high level interface for receiving messages from the Azure Service Bus Queue or Topic Subscription diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_async_message.py b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_async_message.py index b5a25f44cf4..afaa5327527 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_async_message.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_async_message.py @@ -5,7 +5,7 @@ # ------------------------------------------------------------------------- import logging import datetime -from typing import Optional +from typing import Any, Optional from .._common import message as sync_message from .._common.constants import ( @@ -129,7 +129,7 @@ async def defer(self) -> None: # type: ignore await self._settle_message(MESSAGE_DEFER) self._settled = True - async def renew_lock(self) -> datetime.datetime: + async def renew_lock(self, **kwargs: Any) -> datetime.datetime: # pylint: disable=protected-access """Renew the message lock. @@ -140,6 +140,8 @@ async def renew_lock(self) -> datetime.datetime: background task by registering the message with an `azure.servicebus.aio.AutoLockRenew` instance. This operation is only available for non-sessionful messages. + :keyword float timeout: The total operation timeout in seconds including all the retries. The value must be + greater than 0 if specified. The default value is None, meaning no timeout. :returns: The utc datetime the lock is set to expire at. :rtype: datetime.datetime :raises: TypeError if the message is sessionful. @@ -157,7 +159,11 @@ async def renew_lock(self) -> datetime.datetime: if not token: raise ValueError("Unable to renew lock - no lock token found.") - expiry = await self._receiver._renew_locks(token) # type: ignore - self._expiry = utc_from_timestamp(expiry[MGMT_RESPONSE_MESSAGE_EXPIRATION][0]/1000.0) # type: datetime.datetime + timeout = kwargs.pop("timeout", None) + if timeout is not None and timeout <= 0: + raise ValueError("The timeout must be greater than 0.") + + expiry = await self._receiver._renew_locks(token, timeout=timeout) # type: ignore + self._expiry = utc_from_timestamp(expiry[MGMT_RESPONSE_MESSAGE_EXPIRATION][0]/1000.0) # type: datetime.datetime return self._expiry diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_base_handler_async.py b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_base_handler_async.py index d6fb5d3722a..c9ac9923c81 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_base_handler_async.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_base_handler_async.py @@ -5,9 +5,11 @@ import logging import asyncio import uuid -from typing import TYPE_CHECKING, Any +import time +from typing import TYPE_CHECKING, Any, Callable, Optional, Dict import uamqp +from uamqp import compat from uamqp.message import MessageProperties from azure.core.credentials import AccessToken @@ -22,6 +24,7 @@ CONTAINER_PREFIX, MANAGEMENT_PATH_SUFFIX) from ..exceptions import ( ServiceBusError, + OperationTimeoutError, _create_servicebus_exception ) @@ -89,7 +92,7 @@ def __init__( self._container_id = CONTAINER_PREFIX + str(uuid.uuid4())[:8] self._config = Configuration(**kwargs) self._running = False - self._handler = None # type: uamqp.AMQPClient + self._handler = None # type: uamqp.AMQPClientAsync self._auth_uri = None self._properties = create_properties(self._config.user_agent) @@ -124,13 +127,13 @@ async def _backoff( self, retried_times, last_exception, - timeout=None, + abs_timeout_time=None, entity_name=None ): entity_name = entity_name or self._container_id backoff = self._config.retry_backoff_factor * 2 ** retried_times if backoff <= self._config.retry_backoff_max and ( - timeout is None or backoff <= timeout + abs_timeout_time is None or (backoff + time.time()) <= abs_timeout_time ): await asyncio.sleep(backoff) _LOGGER.info( @@ -147,42 +150,65 @@ async def _backoff( raise last_exception async def _do_retryable_operation(self, operation, timeout=None, **kwargs): + # type: (Callable, Optional[float], Any) -> Any require_last_exception = kwargs.pop("require_last_exception", False) - require_timeout = kwargs.pop("require_timeout", False) + operation_requires_timeout = kwargs.pop("operation_requires_timeout", False) retried_times = 0 - last_exception = None max_retries = self._config.retry_total + abs_timeout_time = (time.time() + timeout) if (operation_requires_timeout and timeout) else None + while retried_times <= max_retries: try: - if require_last_exception: - kwargs["last_exception"] = last_exception - if require_timeout: - kwargs["timeout"] = timeout + if operation_requires_timeout and abs_timeout_time: + remaining_timeout = abs_timeout_time - time.time() + kwargs["timeout"] = remaining_timeout return await operation(**kwargs) except StopAsyncIteration: raise except Exception as exception: # pylint: disable=broad-except last_exception = await self._handle_exception(exception) + if require_last_exception: + kwargs["last_exception"] = last_exception retried_times += 1 if retried_times > max_retries: - break + _LOGGER.info( + "%r operation has exhausted retry. Last exception: %r.", + self._container_id, + last_exception, + ) + raise last_exception await self._backoff( retried_times=retried_times, last_exception=last_exception, - timeout=timeout + abs_timeout_time=abs_timeout_time ) - _LOGGER.info( - "%r operation has exhausted retry. Last exception: %r.", - self._container_id, - last_exception, - ) - raise last_exception - async def _mgmt_request_response( - self, mgmt_operation, message, callback, keep_alive_associated_link=True, **kwargs + self, + mgmt_operation, + message, + callback, + keep_alive_associated_link=True, + timeout=None, + **kwargs ): + # type: (bytes, uamqp.Message, Callable, bool, Optional[float], Any) -> uamqp.Message + """ + Execute an amqp management operation. + + :param bytes mgmt_operation: The type of operation to be performed. This value will + be service-specific, but common values include READ, CREATE and UPDATE. + This value will be added as an application property on the message. + :param message: The message to send in the management request. + :paramtype message: ~uamqp.message.Message + :param callback: The callback which is used to parse the returning message. + :paramtype callback: Callable[int, ~uamqp.message.Message, str] + :param keep_alive_associated_link: A boolean flag for keeping associated amqp sender/receiver link alive when + executing operation on mgmt links. + :param timeout: timeout in seconds for executing the mgmt operation. + :rtype: None + """ await self._open() application_properties = {} @@ -206,17 +232,22 @@ async def _mgmt_request_response( mgmt_operation, op_type=MGMT_REQUEST_OP_TYPE_ENTITY_MGMT, node=self._mgmt_target.encode(self._config.encoding), - timeout=5000, + timeout=timeout * 1000 if timeout else None, callback=callback) except Exception as exp: # pylint: disable=broad-except + if isinstance(exp, compat.TimeoutException): + raise OperationTimeoutError("Management operation timed out.", inner_exception=exp) raise ServiceBusError("Management request failed: {}".format(exp), exp) - async def _mgmt_request_response_with_retry(self, mgmt_operation, message, callback, **kwargs): + async def _mgmt_request_response_with_retry(self, mgmt_operation, message, callback, timeout=None, **kwargs): + # type: (bytes, Dict[str, Any], Callable, Optional[float], Any) -> Any return await self._do_retryable_operation( self._mgmt_request_response, mgmt_operation=mgmt_operation, message=message, callback=callback, + timeout=timeout, + operation_requires_timeout=True, **kwargs ) diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_client_async.py b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_client_async.py index 83134591c84..fee1dde97a3 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_client_async.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_client_async.py @@ -137,11 +137,11 @@ def from_connection_string( if token and token_expiry: credential = ServiceBusSASTokenCredential(token, token_expiry) elif policy and key: - credential = ServiceBusSharedKeyCredential(policy, key) # type: ignore + credential = ServiceBusSharedKeyCredential(policy, key) # type: ignore return cls( fully_qualified_namespace=host, entity_name=entity_in_conn_str or kwargs.pop("entity_name", None), - credential=credential, # type: ignore + credential=credential, # type: ignore **kwargs ) diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_receiver_async.py b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_receiver_async.py index c6081c2557f..099a64a8245 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_receiver_async.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_receiver_async.py @@ -14,6 +14,7 @@ from uamqp.constants import SenderSettleMode from ._base_handler_async import BaseHandler +from .._common.message import PeekedMessage from ._async_message import ReceivedMessage from .._common.receiver_mixins import ReceiverMixin from .._common.constants import ( @@ -208,13 +209,6 @@ async def _open(self): await self.close() raise - - async def close(self): - # type: () -> None - await super(ServiceBusReceiver, self).close() - self._message_iter = None - - async def _receive(self, max_message_count=None, timeout=None): # type: (Optional[int], Optional[float]) -> List[ReceivedMessage] # pylint: disable=protected-access @@ -274,14 +268,21 @@ async def _settle_message(self, settlement, lock_tokens, dead_letter_details=Non mgmt_handlers.default ) - async def _renew_locks(self, *lock_tokens): + async def _renew_locks(self, *lock_tokens, timeout=None): + # type: (str, Optional[float]) -> Any message = {MGMT_REQUEST_LOCK_TOKENS: types.AMQPArray(lock_tokens)} return await self._mgmt_request_response_with_retry( REQUEST_RESPONSE_RENEWLOCK_OPERATION, message, - mgmt_handlers.lock_renew_op + mgmt_handlers.lock_renew_op, + timeout=timeout ) + async def close(self): + # type: () -> None + await super(ServiceBusReceiver, self).close() + self._message_iter = None + def get_streaming_message_iter(self, max_wait_time: float = None) -> AsyncIterator[ReceivedMessage]: """Receive messages from an iterator indefinitely, or if a max_wait_time is specified, until such a timeout occurs. @@ -406,11 +407,11 @@ async def receive_messages(self, max_message_count=None, max_wait_time=None): self._receive, max_message_count=max_message_count, timeout=max_wait_time, - require_timeout=True + operation_requires_timeout=True ) - async def receive_deferred_messages(self, sequence_numbers): - # type: (Union[int, List[int]]) -> List[ReceivedMessage] + async def receive_deferred_messages(self, sequence_numbers, **kwargs): + # type: (Union[int, List[int]], Any) -> List[ReceivedMessage] """Receive messages that have previously been deferred. When receiving deferred messages from a partitioned entity, all of the supplied @@ -418,6 +419,8 @@ async def receive_deferred_messages(self, sequence_numbers): :param Union[int, list[int]] sequence_numbers: A list of the sequence numbers of messages that have been deferred. + :keyword float timeout: The total operation timeout in seconds including all the retries. The value must be + greater than 0 if specified. The default value is None, meaning no timeout. :rtype: list[~azure.servicebus.aio.ReceivedMessage] .. admonition:: Example: @@ -431,6 +434,9 @@ async def receive_deferred_messages(self, sequence_numbers): """ self._check_live() + timeout = kwargs.pop("timeout", None) + if timeout is not None and timeout <= 0: + raise ValueError("The timeout must be greater than 0.") if isinstance(sequence_numbers, six.integer_types): sequence_numbers = [sequence_numbers] if not sequence_numbers: @@ -454,11 +460,13 @@ async def receive_deferred_messages(self, sequence_numbers): messages = await self._mgmt_request_response_with_retry( REQUEST_RESPONSE_RECEIVE_BY_SEQUENCE_NUMBER, message, - handler + handler, + timeout=timeout ) return messages - async def peek_messages(self, max_message_count=1, sequence_number=0): + async def peek_messages(self, max_message_count=1, **kwargs): + # type: (int, Optional[float]) -> List[PeekedMessage] """Browse messages currently pending in the queue. Peeked messages are not removed from queue, nor are they locked. They cannot be completed, @@ -466,7 +474,9 @@ async def peek_messages(self, max_message_count=1, sequence_number=0): :param int max_message_count: The maximum number of messages to try and peek. The default value is 1. - :param int sequence_number: A message sequence number from which to start browsing messages. + :keyword int sequence_number: A message sequence number from which to start browsing messages. + :keyword float timeout: The total operation timeout in seconds including all the retries. The value must be + greater than 0 if specified. The default value is None, meaning no timeout. :rtype: list[~azure.servicebus.PeekedMessage] .. admonition:: Example: @@ -479,6 +489,10 @@ async def peek_messages(self, max_message_count=1, sequence_number=0): :caption: Peek messages in the queue. """ self._check_live() + sequence_number = kwargs.pop("sequence_number", 0) + timeout = kwargs.pop("timeout", None) + if timeout is not None and timeout <= 0: + raise ValueError("The timeout must be greater than 0.") if not sequence_number: sequence_number = self._last_received_sequenced_number or 1 if int(max_message_count) < 1: @@ -498,5 +512,6 @@ async def peek_messages(self, max_message_count=1, sequence_number=0): return await self._mgmt_request_response_with_retry( REQUEST_RESPONSE_PEEK_OPERATION, message, - mgmt_handlers.peek_op + mgmt_handlers.peek_op, + timeout=timeout ) diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_sender_async.py b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_sender_async.py index 082a9371bf6..483de912fca 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_sender_async.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_sender_async.py @@ -4,7 +4,7 @@ # -------------------------------------------------------------------------------------------- import logging import asyncio -from typing import Any, TYPE_CHECKING, Union, List +from typing import Any, TYPE_CHECKING, Union, List, Optional import uamqp from uamqp import SendClientAsync, types @@ -131,17 +131,23 @@ async def _open(self): async def _send(self, message, timeout=None, last_exception=None): await self._open() - self._set_msg_timeout(timeout, last_exception) - await self._handler.send_message_async(message.message) + default_timeout = self._handler._msg_timeout # pylint: disable=protected-access + try: + self._set_msg_timeout(timeout, last_exception) + await self._handler.send_message_async(message.message) + finally: # reset the timeout of the handler back to the default value + self._set_msg_timeout(default_timeout, None) - async def schedule_messages(self, messages, schedule_time_utc): - # type: (Union[Message, List[Message]], datetime.datetime) -> List[int] + async def schedule_messages(self, messages, schedule_time_utc, **kwargs): + # type: (Union[Message, List[Message]], datetime.datetime, Any) -> List[int] """Send Message or multiple Messages to be enqueued at a specific time by the service. Returns a list of the sequence numbers of the enqueued messages. :param messages: The message or list of messages to schedule. :type messages: ~azure.servicebus.Message or list[~azure.servicebus.Message] :param schedule_time_utc: The utc date and time to enqueue the messages. :type schedule_time_utc: ~datetime.datetime + :keyword float timeout: The total operation timeout in seconds including all the retries. The value must be + greater than 0 if specified. The default value is None, meaning no timeout. :rtype: list[int] .. admonition:: Example: @@ -155,6 +161,9 @@ async def schedule_messages(self, messages, schedule_time_utc): """ # pylint: disable=protected-access await self._open() + timeout = kwargs.pop("timeout", None) + if timeout is not None and timeout <= 0: + raise ValueError("The timeout must be greater than 0.") if isinstance(messages, Message): request_body = self._build_schedule_request(schedule_time_utc, messages) else: @@ -162,16 +171,19 @@ async def schedule_messages(self, messages, schedule_time_utc): return await self._mgmt_request_response_with_retry( REQUEST_RESPONSE_SCHEDULE_MESSAGE_OPERATION, request_body, - mgmt_handlers.schedule_op + mgmt_handlers.schedule_op, + timeout=timeout ) - async def cancel_scheduled_messages(self, sequence_numbers): - # type: (Union[int, List[int]]) -> None + async def cancel_scheduled_messages(self, sequence_numbers, **kwargs): + # type: (Union[int, List[int]], Any) -> None """ Cancel one or more messages that have previously been scheduled and are still pending. :param sequence_numbers: The sequence numbers of the scheduled messages. :type sequence_numbers: int or list[int] + :keyword float timeout: The total operation timeout in seconds including all the retries. The value must be + greater than 0 if specified. The default value is None, meaning no timeout. :rtype: None :raises: ~azure.servicebus.exceptions.ServiceBusError if messages cancellation failed due to message already cancelled or enqueued. @@ -186,6 +198,9 @@ async def cancel_scheduled_messages(self, sequence_numbers): :caption: Cancelling messages scheduled to be sent in future """ await self._open() + timeout = kwargs.pop("timeout", None) + if timeout is not None and timeout <= 0: + raise ValueError("The timeout must be greater than 0.") if isinstance(sequence_numbers, int): numbers = [types.AMQPLong(sequence_numbers)] else: @@ -194,7 +209,8 @@ async def cancel_scheduled_messages(self, sequence_numbers): return await self._mgmt_request_response_with_retry( REQUEST_RESPONSE_CANCEL_SCHEDULED_MESSAGE_OPERATION, request_body, - mgmt_handlers.default + mgmt_handlers.default, + timeout=timeout ) @classmethod @@ -237,8 +253,8 @@ def from_connection_string( ) return cls(**constructor_args) - async def send_messages(self, message): - # type: (Union[Message, BatchMessage, List[Message]]) -> None + async def send_messages(self, message, **kwargs): + # type: (Union[Message, BatchMessage, List[Message]], Any) -> None """Sends message and blocks until acknowledgement is received or operation times out. If a list of messages was provided, attempts to send them as a single batch, throwing a @@ -246,6 +262,8 @@ async def send_messages(self, message): :param message: The ServiceBus message to be sent. :type message: ~azure.servicebus.Message or ~azure.servicebus.BatchMessage or list[~azure.servicebus.Message] + :keyword float timeout: The total operation timeout in seconds including all the retries. The value must be + greater than 0 if specified. The default value is None, meaning no timeout. :rtype: None :raises: :class: ~azure.servicebus.exceptions.OperationTimeoutError if sending times out. @@ -266,6 +284,9 @@ async def send_messages(self, message): :caption: Send message. """ + timeout = kwargs.pop("timeout", None) + if timeout is not None and timeout <= 0: + raise ValueError("The timeout must be greater than 0.") message = transform_messages_to_sendable_if_needed(message) try: batch = await self.create_batch() @@ -281,7 +302,8 @@ async def send_messages(self, message): await self._do_retryable_operation( self._send, message=message, - require_timeout=True, + timeout=timeout, + operation_requires_timeout=True, require_last_exception=True ) diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_session_async.py b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_session_async.py index c4bca78bd16..deec3792fa3 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_session_async.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_session_async.py @@ -4,7 +4,7 @@ # -------------------------------------------------------------------------------------------- import logging import datetime -from typing import Union +from typing import Union, Any import six from .._servicebus_session import BaseSession @@ -40,12 +40,14 @@ class ServiceBusSession(BaseSession): :caption: Get session from a receiver """ - async def get_state(self): - # type: () -> str + async def get_state(self, **kwargs): + # type: (Any) -> str """Get the session state. Returns None if no state has been set. + :keyword float timeout: The total operation timeout in seconds including all the retries. The value must be + greater than 0 if specified. The default value is None, meaning no timeout. :rtype: str .. admonition:: Example: @@ -58,22 +60,28 @@ async def get_state(self): :caption: Get the session state """ self._check_live() + timeout = kwargs.pop("timeout", None) + if timeout is not None and timeout <= 0: + raise ValueError("The timeout must be greater than 0.") response = await self._receiver._mgmt_request_response_with_retry( # pylint: disable=protected-access REQUEST_RESPONSE_GET_SESSION_STATE_OPERATION, {MGMT_REQUEST_SESSION_ID: self._session_id}, - mgmt_handlers.default + mgmt_handlers.default, + timeout=timeout ) session_state = response.get(MGMT_RESPONSE_SESSION_STATE) if isinstance(session_state, six.binary_type): session_state = session_state.decode('UTF-8') return session_state - async def set_state(self, state): - # type: (Union[str, bytes, bytearray]) -> None + async def set_state(self, state, **kwargs): + # type: (Union[str, bytes, bytearray], Any) -> None """Set the session state. :param state: The state value. :type state: Union[str, bytes, bytearray] + :keyword float timeout: The total operation timeout in seconds including all the retries. The value must be + greater than 0 if specified. The default value is None, meaning no timeout. :rtype: None .. admonition:: Example: @@ -86,15 +94,19 @@ async def set_state(self, state): :caption: Set the session state """ self._check_live() + timeout = kwargs.pop("timeout", None) + if timeout is not None and timeout <= 0: + raise ValueError("The timeout must be greater than 0.") state = state.encode(self._encoding) if isinstance(state, six.text_type) else state return await self._receiver._mgmt_request_response_with_retry( # pylint: disable=protected-access REQUEST_RESPONSE_SET_SESSION_STATE_OPERATION, {MGMT_REQUEST_SESSION_ID: self._session_id, MGMT_REQUEST_SESSION_STATE: bytearray(state)}, - mgmt_handlers.default + mgmt_handlers.default, + timeout=timeout ) - async def renew_lock(self): - # type: () -> datetime.datetime + async def renew_lock(self, **kwargs): + # type: (Any) -> datetime.datetime """Renew the session lock. This operation must be performed periodically in order to retain a lock on the @@ -105,6 +117,8 @@ async def renew_lock(self): This operation can also be performed as a threaded background task by registering the session with an `azure.servicebus.aio.AutoLockRenew` instance. + :keyword float timeout: The total operation timeout in seconds including all the retries. The value must be + greater than 0 if specified. The default value is None, meaning no timeout. :returns: The utc datetime the lock is set to expire at. :rtype: datetime @@ -118,10 +132,14 @@ async def renew_lock(self): :caption: Renew the session lock before it expires """ self._check_live() + timeout = kwargs.pop("timeout", None) + if timeout is not None and timeout <= 0: + raise ValueError("The timeout must be greater than 0.") expiry = await self._receiver._mgmt_request_response_with_retry( # pylint: disable=protected-access REQUEST_RESPONSE_RENEW_SESSION_LOCK_OPERATION, {MGMT_REQUEST_SESSION_ID: self._session_id}, - mgmt_handlers.default + mgmt_handlers.default, + timeout=timeout ) expiry_timestamp = expiry[MGMT_RESPONSE_RECEIVER_EXPIRATION]/1000.0 self._locked_until_utc = utc_from_timestamp(expiry_timestamp) # type: datetime.datetime diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/exceptions.py b/sdk/servicebus/azure-servicebus/azure/servicebus/exceptions.py index 5540709362d..202730ddcfa 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/exceptions.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/exceptions.py @@ -81,7 +81,10 @@ def _create_servicebus_exception(logger, exception, handler): # pylint: disable error_need_raise = True elif isinstance(exception, errors.MessageException): logger.info("Message send failed (%r)", exception) - error = MessageSendFailed(exception) + if exception.condition == constants.ErrorCodes.ClientError and 'timed out' in str(exception): + error = OperationTimeoutError("Send operation timed out", inner_exception=exception) + else: + error = MessageSendFailed(exception) error_need_raise = False elif isinstance(exception, errors.LinkDetach) and exception.condition == SESSION_LOCK_LOST: try: diff --git a/sdk/servicebus/azure-servicebus/setup.py b/sdk/servicebus/azure-servicebus/setup.py index a66b6aec3de..84802e7402b 100644 --- a/sdk/servicebus/azure-servicebus/setup.py +++ b/sdk/servicebus/azure-servicebus/setup.py @@ -78,7 +78,7 @@ 'azure', ]), install_requires=[ - 'uamqp>=1.2.10,<2.0.0', + 'uamqp>=1.2.11,<2.0.0', 'azure-common~=1.1', 'msrest>=0.6.17,<2.0.0', 'azure-core<2.0.0,>=1.6.0', diff --git a/sdk/servicebus/azure-servicebus/tests/async_tests/test_queues_async.py b/sdk/servicebus/azure-servicebus/tests/async_tests/test_queues_async.py index 2fcb31fe61f..5cc213603be 100644 --- a/sdk/servicebus/azure-servicebus/tests/async_tests/test_queues_async.py +++ b/sdk/servicebus/azure-servicebus/tests/async_tests/test_queues_async.py @@ -8,11 +8,14 @@ import logging import sys import os +import types import pytest import time import uuid from datetime import datetime, timedelta +import uamqp +from uamqp import compat from azure.servicebus.aio import ( ServiceBusClient, ReceivedMessage, @@ -29,7 +32,9 @@ AutoLockRenewTimeout, MessageSendFailed, MessageSettleFailed, - MessageContentTooLarge) + MessageContentTooLarge, + OperationTimeoutError +) from devtools_testutils import AzureMgmtTestCase, CachedResourceGroupPreparer from servicebus_preparer import CachedServiceBusNamespacePreparer, CachedServiceBusQueuePreparer, ServiceBusQueuePreparer from utilities import get_logger, print_message, sleep_until_expired @@ -52,7 +57,7 @@ async def test_async_queue_by_queue_client_conn_str_receive_handler_peeklock(sel async with sb_client.get_queue_sender(servicebus_queue.name) as sender: for i in range(10): message = Message("Handler message no. {}".format(i)) - await sender.send_messages(message) + await sender.send_messages(message, timeout=5) with pytest.raises(ServiceBusConnectionError): await (sb_client.get_queue_session_receiver(servicebus_queue.name, session_id="test", max_wait_time=5))._open_with_retry() @@ -66,7 +71,6 @@ async def test_async_queue_by_queue_client_conn_str_receive_handler_peeklock(sel assert count == 10 - @pytest.mark.liveTest @pytest.mark.live_test_only @CachedResourceGroupPreparer(name_prefix='servicebustest') @@ -81,6 +85,7 @@ async def test_async_queue_by_queue_client_send_multiple_messages(self, serviceb message = Message("Handler message no. {}".format(i)) messages.append(message) await sender.send_messages(messages) + assert sender._handler._msg_timeout == 0 async with sb_client.get_queue_receiver(servicebus_queue.name, max_wait_time=5) as receiver: count = 0 @@ -91,7 +96,6 @@ async def test_async_queue_by_queue_client_send_multiple_messages(self, serviceb assert count == 10 - @pytest.mark.liveTest @pytest.mark.live_test_only @CachedResourceGroupPreparer() @@ -133,7 +137,6 @@ async def test_github_issue_6178_async(self, servicebus_namespace_connection_str await message.complete() await asyncio.sleep(40) - @pytest.mark.liveTest @pytest.mark.live_test_only @CachedResourceGroupPreparer(name_prefix='servicebustest') @@ -327,7 +330,7 @@ async def test_async_queue_by_servicebus_client_iter_messages_with_retrieve_defe assert count == 10 - deferred = await receiver.receive_deferred_messages(deferred_messages) + deferred = await receiver.receive_deferred_messages(deferred_messages, timeout=5) assert len(deferred) == 10 for message in deferred: assert isinstance(message, ReceivedMessage) @@ -360,6 +363,8 @@ async def test_async_queue_by_servicebus_client_iter_messages_with_retrieve_defe assert count == 10 async with sb_client.get_queue_receiver(servicebus_queue.name, max_wait_time=5) as session: + with pytest.raises(ValueError): + await session.receive_deferred_messages(deferred_messages, timeout=0) deferred = await session.receive_deferred_messages(deferred_messages) assert len(deferred) == 10 for message in deferred: @@ -395,7 +400,7 @@ async def test_async_queue_by_servicebus_client_iter_messages_with_retrieve_defe assert count == 10 async with sb_client.get_queue_receiver(servicebus_queue.name, max_wait_time=5) as session: - deferred = await session.receive_deferred_messages(deferred_messages) + deferred = await session.receive_deferred_messages(deferred_messages, timeout=None) assert len(deferred) == 10 for message in deferred: assert isinstance(message, ReceivedMessage) @@ -628,7 +633,7 @@ async def test_async_queue_by_servicebus_client_browse_messages_with_receiver(se message = Message("Test message no. {}".format(i)) await sender.send_messages(message) - messages = await receiver.peek_messages(5) + messages = await receiver.peek_messages(5, timeout=5) assert len(messages) > 0 assert all(isinstance(m, PeekedMessage) for m in messages) for message in messages: @@ -883,7 +888,7 @@ async def test_async_queue_message_lock_renew(self, servicebus_namespace_connect messages = await receiver.receive_messages(max_wait_time=10) assert len(messages) == 1 time.sleep(15) - await messages[0].renew_lock() + await messages[0].renew_lock(timeout=5) time.sleep(15) await messages[0].renew_lock() time.sleep(15) @@ -1019,7 +1024,7 @@ async def test_async_queue_schedule_multiple_messages(self, servicebus_namespace received_messages.append(message) await message.complete() - tokens = await sender.schedule_messages(received_messages, scheduled_enqueue_time) + tokens = await sender.schedule_messages(received_messages, scheduled_enqueue_time, timeout=5) assert len(tokens) == 2 messages = await receiver.receive_messages(max_wait_time=120) @@ -1056,7 +1061,7 @@ async def test_async_queue_cancel_scheduled_messages(self, servicebus_namespace_ tokens = await sender.schedule_messages([message_a, message_b], enqueue_time) assert len(tokens) == 2 - await sender.cancel_scheduled_messages(tokens) + await sender.cancel_scheduled_messages(tokens, timeout=None) messages = await receiver.receive_messages(max_wait_time=120) assert len(messages) == 0 @@ -1444,3 +1449,66 @@ async def test_async_queue_send_twice(self, servicebus_namespace_connection_stri async for message in receiver: messages.append(message) assert len(messages) == 2 + + @pytest.mark.liveTest + @pytest.mark.live_test_only + @CachedResourceGroupPreparer(name_prefix='servicebustest') + @CachedServiceBusNamespacePreparer(name_prefix='servicebustest') + @CachedServiceBusQueuePreparer(name_prefix='servicebustest', dead_lettering_on_message_expiration=True) + async def test_async_queue_send_timeout(self, servicebus_namespace_connection_string, servicebus_queue, **kwargs): + async def _hack_amqp_sender_run_async(cls): + await asyncio.sleep(6) # sleep until timeout + await cls.message_handler.work_async() + cls._waiting_messages = 0 + cls._pending_messages = cls._filter_pending() + if cls._backoff and not cls._waiting_messages: + _logger.info("Client told to backoff - sleeping for %r seconds", cls._backoff) + await cls._connection.sleep_async(cls._backoff) + cls._backoff = 0 + await cls._connection.work_async() + return True + + async with ServiceBusClient.from_connection_string( + servicebus_namespace_connection_string, logging_enable=False) as sb_client: + async with sb_client.get_queue_sender(servicebus_queue.name) as sender: + # this one doesn't need to reset the method, as it's hacking the method on the instance + sender._handler._client_run_async = types.MethodType(_hack_amqp_sender_run_async, sender._handler) + with pytest.raises(OperationTimeoutError): + await sender.send_messages(Message("body"), timeout=5) + + @pytest.mark.liveTest + @pytest.mark.live_test_only + @CachedResourceGroupPreparer(name_prefix='servicebustest') + @CachedServiceBusNamespacePreparer(name_prefix='servicebustest') + @CachedServiceBusQueuePreparer(name_prefix='servicebustest', dead_lettering_on_message_expiration=True) + async def test_async_queue_mgmt_operation_timeout(self, servicebus_namespace_connection_string, servicebus_queue, **kwargs): + async def hack_mgmt_execute_async(self, operation, op_type, message, timeout=0): + start_time = self._counter.get_current_ms() + operation_id = str(uuid.uuid4()) + self._responses[operation_id] = None + + await asyncio.sleep(6) # sleep until timeout + while not self._responses[operation_id] and not self.mgmt_error: + if timeout > 0: + now = self._counter.get_current_ms() + if (now - start_time) >= timeout: + raise compat.TimeoutException("Failed to receive mgmt response in {}ms".format(timeout)) + await self.connection.work_async() + if self.mgmt_error: + raise self.mgmt_error + response = self._responses.pop(operation_id) + return response + + original_execute_method = uamqp.async_ops.mgmt_operation_async.MgmtOperationAsync.execute_async + # hack the mgmt method on the class, not on an instance, so it needs reset + try: + uamqp.async_ops.mgmt_operation_async.MgmtOperationAsync.execute_async = hack_mgmt_execute_async + async with ServiceBusClient.from_connection_string( + servicebus_namespace_connection_string, logging_enable=False) as sb_client: + async with sb_client.get_queue_sender(servicebus_queue.name) as sender: + with pytest.raises(OperationTimeoutError): + scheduled_time_utc = utc_now() + timedelta(seconds=30) + await sender.schedule_messages(Message("Message to be scheduled"), scheduled_time_utc, timeout=5) + finally: + # must reset the mgmt execute method, otherwise other test cases would use the hacked execute method, leading to timeout error + uamqp.async_ops.mgmt_operation_async.MgmtOperationAsync.execute_async = original_execute_method diff --git a/sdk/servicebus/azure-servicebus/tests/async_tests/test_sessions_async.py b/sdk/servicebus/azure-servicebus/tests/async_tests/test_sessions_async.py index 616561373b2..cd751fceaf8 100644 --- a/sdk/servicebus/azure-servicebus/tests/async_tests/test_sessions_async.py +++ b/sdk/servicebus/azure-servicebus/tests/async_tests/test_sessions_async.py @@ -467,7 +467,7 @@ async def test_async_session_by_servicebus_client_renew_client_locks(self, servi assert m.lock_token is not None time.sleep(5) initial_expiry = receiver.session.locked_until_utc - await receiver.session.renew_lock() + await receiver.session.renew_lock(timeout=5) assert (receiver.session.locked_until_utc - initial_expiry) >= timedelta(seconds=5) finally: await messages[0].complete() @@ -733,8 +733,8 @@ async def test_async_session_get_set_state_with_receiver(self, servicebus_namesp await sender.send_messages(message) async with sb_client.get_queue_session_receiver(servicebus_queue.name, session_id=session_id, max_wait_time=5) as session: - assert await session.session.get_state() == None - await session.session.set_state("first_state") + assert await session.session.get_state(timeout=5) == None + await session.session.set_state("first_state", timeout=5) count = 0 async for m in session: assert m.session_id == session_id diff --git a/sdk/servicebus/azure-servicebus/tests/test_queues.py b/sdk/servicebus/azure-servicebus/tests/test_queues.py index a5c01c337db..ea03cceb0d0 100644 --- a/sdk/servicebus/azure-servicebus/tests/test_queues.py +++ b/sdk/servicebus/azure-servicebus/tests/test_queues.py @@ -7,6 +7,7 @@ import logging import sys import os +import types import pytest import time import uuid @@ -14,6 +15,7 @@ import calendar import uamqp +from uamqp import compat from azure.servicebus import ServiceBusClient, AutoLockRenew, TransportType from azure.servicebus._common.message import Message, PeekedMessage, ReceivedMessage, BatchMessage from azure.servicebus._common.constants import ( @@ -33,7 +35,9 @@ AutoLockRenewTimeout, MessageSendFailed, MessageSettleFailed, - MessageContentTooLarge) + MessageContentTooLarge, + OperationTimeoutError +) from devtools_testutils import AzureMgmtTestCase, CachedResourceGroupPreparer from servicebus_preparer import CachedServiceBusNamespacePreparer, ServiceBusQueuePreparer, CachedServiceBusQueuePreparer @@ -1910,4 +1914,68 @@ def test_message_inner_amqp_properties(self, servicebus_namespace_connection_str # # delivery_annotations and footer disabled pending uamqp bug https://github.com/Azure/azure-uamqp-python/issues/169 # #assert message.amqp_message.delivery_annotations[b"delivery_annotations"] == 3 # assert message.amqp_message.header.priority == 5 - # #assert message.amqp_message.footer[b"footer"] == 6 \ No newline at end of file + # #assert message.amqp_message.footer[b"footer"] == 6 + + @pytest.mark.liveTest + @pytest.mark.live_test_only + @CachedResourceGroupPreparer(name_prefix='servicebustest') + @CachedServiceBusNamespacePreparer(name_prefix='servicebustest') + @CachedServiceBusQueuePreparer(name_prefix='servicebustest', dead_lettering_on_message_expiration=True) + def test_queue_send_timeout(self, servicebus_namespace_connection_string, servicebus_queue, **kwargs): + def _hack_amqp_sender_run(cls): + time.sleep(6) # sleep until timeout + cls.message_handler.work() + cls._waiting_messages = 0 + cls._pending_messages = cls._filter_pending() + if cls._backoff and not cls._waiting_messages: + _logger.info("Client told to backoff - sleeping for %r seconds", cls._backoff) + cls._connection.sleep(cls._backoff) + cls._backoff = 0 + cls._connection.work() + return True + + with ServiceBusClient.from_connection_string( + servicebus_namespace_connection_string, logging_enable=False) as sb_client: + with sb_client.get_queue_sender(servicebus_queue.name) as sender: + # this one doesn't need to reset the method, as it's hacking the method on the instance + sender._handler._client_run = types.MethodType(_hack_amqp_sender_run, sender._handler) + with pytest.raises(OperationTimeoutError): + sender.send_messages(Message("body"), timeout=5) + + @pytest.mark.liveTest + @pytest.mark.live_test_only + @CachedResourceGroupPreparer(name_prefix='servicebustest') + @CachedServiceBusNamespacePreparer(name_prefix='servicebustest') + @CachedServiceBusQueuePreparer(name_prefix='servicebustest', dead_lettering_on_message_expiration=True) + def test_queue_mgmt_operation_timeout(self, servicebus_namespace_connection_string, servicebus_queue, **kwargs): + def hack_mgmt_execute(self, operation, op_type, message, timeout=0): + start_time = self._counter.get_current_ms() + operation_id = str(uuid.uuid4()) + self._responses[operation_id] = None + + time.sleep(6) # sleep until timeout + while not self._responses[operation_id] and not self.mgmt_error: + if timeout > 0: + now = self._counter.get_current_ms() + if (now - start_time) >= timeout: + raise compat.TimeoutException("Failed to receive mgmt response in {}ms".format(timeout)) + self.connection.work() + if self.mgmt_error: + raise self.mgmt_error + response = self._responses.pop(operation_id) + return response + + original_execute_method = uamqp.mgmt_operation.MgmtOperation.execute + # hack the mgmt method on the class, not on an instance, so it needs reset + + try: + uamqp.mgmt_operation.MgmtOperation.execute = hack_mgmt_execute + with ServiceBusClient.from_connection_string( + servicebus_namespace_connection_string, logging_enable=False) as sb_client: + with sb_client.get_queue_sender(servicebus_queue.name) as sender: + with pytest.raises(OperationTimeoutError): + scheduled_time_utc = utc_now() + timedelta(seconds=30) + sender.schedule_messages(Message("Message to be scheduled"), scheduled_time_utc, timeout=5) + finally: + # must reset the mgmt execute method, otherwise other test cases would use the hacked execute method, leading to timeout error + uamqp.mgmt_operation.MgmtOperation.execute = original_execute_method diff --git a/sdk/servicebus/azure-servicebus/tests/test_sessions.py b/sdk/servicebus/azure-servicebus/tests/test_sessions.py index bf065a565bd..a8ff86695eb 100644 --- a/sdk/servicebus/azure-servicebus/tests/test_sessions.py +++ b/sdk/servicebus/azure-servicebus/tests/test_sessions.py @@ -555,7 +555,7 @@ def test_session_by_servicebus_client_renew_client_locks(self, servicebus_namesp assert m.lock_token is not None time.sleep(5) initial_expiry = receiver.session._locked_until_utc - receiver.session.renew_lock() + receiver.session.renew_lock(timeout=5) assert (receiver.session._locked_until_utc - initial_expiry) >= timedelta(seconds=5) finally: messages[0].complete() @@ -730,7 +730,7 @@ def test_session_schedule_message(self, servicebus_namespace_connection_string, count = 0 while not messages and count < 12: messages = receiver.receive_messages(max_wait_time=10) - receiver.session.renew_lock() + receiver.session.renew_lock(timeout=None) count += 1 data = str(messages[0]) @@ -769,7 +769,7 @@ def test_session_schedule_multiple_messages(self, servicebus_namespace_connectio messages = [] count = 0 while len(messages) < 2 and count < 12: - receiver.session.renew_lock() + receiver.session.renew_lock(timeout=None) messages = receiver.receive_messages(max_wait_time=15) time.sleep(5) count += 1 @@ -829,8 +829,8 @@ def test_session_get_set_state_with_receiver(self, servicebus_namespace_connecti sender.send_messages(message) with sb_client.get_queue_session_receiver(servicebus_queue.name, session_id=session_id, max_wait_time=5) as session: - assert session.session.get_state() == None - session.session.set_state("first_state") + assert session.session.get_state(timeout=5) == None + session.session.set_state("first_state", timeout=5) count = 0 for m in session: assert m.session_id == session_id diff --git a/shared_requirements.txt b/shared_requirements.txt index 905c8c79783..f64d4834757 100644 --- a/shared_requirements.txt +++ b/shared_requirements.txt @@ -154,7 +154,7 @@ opentelemetry-api==0.13b0 #override azure-eventhub-checkpointstoreblob-aio aiohttp<4.0,>=3.0 #override azure-eventhub uamqp<2.0,>=1.2.7 #override azure-appconfiguration msrest>=0.6.10 -#override azure-servicebus uamqp>=1.2.10,<2.0.0 +#override azure-servicebus uamqp>=1.2.11,<2.0.0 #override azure-servicebus msrest>=0.6.17,<2.0.0 #override azure-servicebus azure-core<2.0.0,>=1.6.0 #override azure-search-documents msrest>=0.6.10