From 5366e24cdcddeb3326a394c9131936696f058990 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Albert=20=C5=81=C4=85cki?= Date: Tue, 24 Jun 2014 17:16:22 +0200 Subject: [PATCH] Added support for connection.blocked / connection.unblocked client callbacks --- pika/connection.py | 49 ++++++++++++++++++++++++++++++++++++++++++- pika/spec.py | 52 ++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 100 insertions(+), 1 deletion(-) diff --git a/pika/connection.py b/pika/connection.py index 8cc906d6e..75f6b94a6 100644 --- a/pika/connection.py +++ b/pika/connection.py @@ -539,6 +539,8 @@ class should not be invoked directly but rather through the use of an ON_CONNECTION_CLOSED = '_on_connection_closed' ON_CONNECTION_ERROR = '_on_connection_error' ON_CONNECTION_OPEN = '_on_connection_open' + ON_CONNECTION_BLOCKED = '_on_connection_blocked' + ON_CONNECTION_UNBLOCKED = '_on_connection_unblocked' CONNECTION_CLOSED = 0 CONNECTION_INIT = 1 CONNECTION_PROTOCOL = 2 @@ -632,6 +634,25 @@ def add_on_open_error_callback(self, callback_method, remove_default=True): self._on_connection_error) self.callbacks.add(0, self.ON_CONNECTION_ERROR, callback_method, False) + def add_on_blocked_callback(self, callback_method): + """Add a callback notification when the connection has been blocked. + The callback will be passed the connection and reason received + from server + + :param method callback_method: Callback to call on blocked + + """ + self.callbacks.add(0, self.ON_CONNECTION_BLOCKED, callback_method, False) + + def add_on_unblocked_callback(self, callback_method): + """Add a callback notification when the connection has been unblocked. + The callback shold accept the connection object + + :param method callback_method: Callback to call on unblocked + + """ + self.callbacks.add(0, self.ON_CONNECTION_UNBLOCKED, callback_method, False) + def add_timeout(self, deadline, callback_method): """Adapters should override to call the callback after the specified number of seconds have elapsed, using a timer, or a @@ -838,6 +859,14 @@ def _add_connection_tune_callback(self): """Add a callback for when a Connection.Tune frame is received.""" self.callbacks.add(0, spec.Connection.Tune, self._on_connection_tune) + def _add_connection_blocked_callback(self): + """Add a callback for when a Connection.Blocked frame is received.""" + self.callbacks.add(0, spec.Connection.Blocked, self._on_connection_blocked) + + def _add_connection_unblocked_callback(self): + """Add a callback for when a Connection.Unblocked frame is received.""" + self.callbacks.add(0, spec.Connection.Unblocked, self._on_connection_unblocked) + def _append_frame_buffer(self, value): """Append the bytes to the frame buffer. @@ -880,7 +909,8 @@ def _client_properties(self): 'platform': 'Python %s' % platform.python_version(), 'capabilities': {'basic.nack': True, 'consumer_cancel_notify': True, - 'publisher_confirms': True}, + 'publisher_confirms': True, + 'connection.blocked': True}, 'information': 'See http://pika.rtfd.org', 'version': __version__} @@ -1222,6 +1252,8 @@ def _on_connection_start(self, method_frame): self._check_for_protocol_mismatch(method_frame) self._set_server_information(method_frame) self._add_connection_tune_callback() + self._add_connection_blocked_callback() + self._add_connection_unblocked_callback() self._send_connection_start_ok(*self._get_credentials(method_frame)) def _on_connection_tune(self, method_frame): @@ -1255,6 +1287,21 @@ def _on_connection_tune(self, method_frame): # Send the Connection.Open RPC call for the vhost self._send_connection_open() + def _on_connection_blocked(self, method_frame): + """This is called as a callback once we have received a Connection.Blocked + + :param pika.frame.Method method_frame: The frame received + """ + self.callbacks.process(0, self.ON_CONNECTION_BLOCKED, self, self, + method_frame.method.reason) + + def _on_connection_unblocked(self, method_frame): + """This is called as a callback once we have received a Connection.Unblocked + + :param pika.frame.Method method_frame: The frame received + """ + self.callbacks.process(0, self.ON_CONNECTION_UNBLOCKED, self, self) + def _on_data_available(self, data_in): """This is called by our Adapter, passing in the data from the socket. As long as we have buffer try and map out frame data. diff --git a/pika/spec.py b/pika/spec.py index b6cf0fe95..212454655 100644 --- a/pika/spec.py +++ b/pika/spec.py @@ -442,6 +442,56 @@ def encode(self): pieces = list() return pieces + class Blocked(amqp_object.Method): + + INDEX = 0x000A003C # 10, 60; 655420 + NAME = 'Connection.Blocked' + + def __init__(self, reason=''): + self.reason = reason + + @property + def synchronous(self): + return False + + def decode(self, encoded, offset=0): + length = struct.unpack_from('B', encoded, offset)[0] + offset += 1 + self.reason = encoded[offset:offset + length].decode('utf8') + try: + self.reason = str(self.reason) + except UnicodeEncodeError: + pass + offset += length + return self + + def encode(self): + pieces = list() + assert isinstance(self.reason, str),\ + 'A non-bytestring value was supplied for self.reason' + value = self.reason.encode('utf-8') if isinstance(self.reason, str) else self.reason + pieces.append(value) + return pieces + + class Unblocked(amqp_object.Method): + + INDEX = 0x000A003D # 10, 61; 655421 + NAME = 'Connection.Unblocked' + + def __init__(self): + pass + + @property + def synchronous(self): + return False + + def decode(self, encoded, offset=0): + return self + + def encode(self): + pieces = list() + return pieces + class Channel(amqp_object.Class): @@ -2692,6 +2742,8 @@ def encode(self): 0x000A0029: Connection.OpenOk, 0x000A0032: Connection.Close, 0x000A0033: Connection.CloseOk, + 0x000A003C: Connection.Blocked, + 0x000A003D: Connection.Unblocked, 0x0014000A: Channel.Open, 0x0014000B: Channel.OpenOk, 0x00140014: Channel.Flow,