Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added support for connection.blocked / connection.unblocked client callbacks #3

Open
wants to merge 1 commit into
base: python3
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 48 additions & 1 deletion pika/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -539,6 +539,8 @@ class should not be invoked directly but rather through the use of an
ON_CONNECTION_CLOSED = '_on_connection_closed'
ON_CONNECTION_ERROR = '_on_connection_error'
ON_CONNECTION_OPEN = '_on_connection_open'
ON_CONNECTION_BLOCKED = '_on_connection_blocked'
ON_CONNECTION_UNBLOCKED = '_on_connection_unblocked'
CONNECTION_CLOSED = 0
CONNECTION_INIT = 1
CONNECTION_PROTOCOL = 2
Expand Down Expand Up @@ -632,6 +634,25 @@ def add_on_open_error_callback(self, callback_method, remove_default=True):
self._on_connection_error)
self.callbacks.add(0, self.ON_CONNECTION_ERROR, callback_method, False)

def add_on_blocked_callback(self, callback_method):
"""Add a callback notification when the connection has been blocked.
The callback will be passed the connection and reason received
from server

:param method callback_method: Callback to call on blocked

"""
self.callbacks.add(0, self.ON_CONNECTION_BLOCKED, callback_method, False)

def add_on_unblocked_callback(self, callback_method):
"""Add a callback notification when the connection has been unblocked.
The callback shold accept the connection object

:param method callback_method: Callback to call on unblocked

"""
self.callbacks.add(0, self.ON_CONNECTION_UNBLOCKED, callback_method, False)

def add_timeout(self, deadline, callback_method):
"""Adapters should override to call the callback after the
specified number of seconds have elapsed, using a timer, or a
Expand Down Expand Up @@ -838,6 +859,14 @@ def _add_connection_tune_callback(self):
"""Add a callback for when a Connection.Tune frame is received."""
self.callbacks.add(0, spec.Connection.Tune, self._on_connection_tune)

def _add_connection_blocked_callback(self):
"""Add a callback for when a Connection.Blocked frame is received."""
self.callbacks.add(0, spec.Connection.Blocked, self._on_connection_blocked)

def _add_connection_unblocked_callback(self):
"""Add a callback for when a Connection.Unblocked frame is received."""
self.callbacks.add(0, spec.Connection.Unblocked, self._on_connection_unblocked)

def _append_frame_buffer(self, value):
"""Append the bytes to the frame buffer.

Expand Down Expand Up @@ -880,7 +909,8 @@ def _client_properties(self):
'platform': 'Python %s' % platform.python_version(),
'capabilities': {'basic.nack': True,
'consumer_cancel_notify': True,
'publisher_confirms': True},
'publisher_confirms': True,
'connection.blocked': True},
'information': 'See http://pika.rtfd.org',
'version': __version__}

Expand Down Expand Up @@ -1222,6 +1252,8 @@ def _on_connection_start(self, method_frame):
self._check_for_protocol_mismatch(method_frame)
self._set_server_information(method_frame)
self._add_connection_tune_callback()
self._add_connection_blocked_callback()
self._add_connection_unblocked_callback()
self._send_connection_start_ok(*self._get_credentials(method_frame))

def _on_connection_tune(self, method_frame):
Expand Down Expand Up @@ -1255,6 +1287,21 @@ def _on_connection_tune(self, method_frame):
# Send the Connection.Open RPC call for the vhost
self._send_connection_open()

def _on_connection_blocked(self, method_frame):
"""This is called as a callback once we have received a Connection.Blocked

:param pika.frame.Method method_frame: The frame received
"""
self.callbacks.process(0, self.ON_CONNECTION_BLOCKED, self, self,
method_frame.method.reason)

def _on_connection_unblocked(self, method_frame):
"""This is called as a callback once we have received a Connection.Unblocked

:param pika.frame.Method method_frame: The frame received
"""
self.callbacks.process(0, self.ON_CONNECTION_UNBLOCKED, self, self)

def _on_data_available(self, data_in):
"""This is called by our Adapter, passing in the data from the socket.
As long as we have buffer try and map out frame data.
Expand Down
52 changes: 52 additions & 0 deletions pika/spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -442,6 +442,56 @@ def encode(self):
pieces = list()
return pieces

class Blocked(amqp_object.Method):

INDEX = 0x000A003C # 10, 60; 655420
NAME = 'Connection.Blocked'

def __init__(self, reason=''):
self.reason = reason

@property
def synchronous(self):
return False

def decode(self, encoded, offset=0):
length = struct.unpack_from('B', encoded, offset)[0]
offset += 1
self.reason = encoded[offset:offset + length].decode('utf8')
try:
self.reason = str(self.reason)
except UnicodeEncodeError:
pass
offset += length
return self

def encode(self):
pieces = list()
assert isinstance(self.reason, str),\
'A non-bytestring value was supplied for self.reason'
value = self.reason.encode('utf-8') if isinstance(self.reason, str) else self.reason
pieces.append(value)
return pieces

class Unblocked(amqp_object.Method):

INDEX = 0x000A003D # 10, 61; 655421
NAME = 'Connection.Unblocked'

def __init__(self):
pass

@property
def synchronous(self):
return False

def decode(self, encoded, offset=0):
return self

def encode(self):
pieces = list()
return pieces


class Channel(amqp_object.Class):

Expand Down Expand Up @@ -2692,6 +2742,8 @@ def encode(self):
0x000A0029: Connection.OpenOk,
0x000A0032: Connection.Close,
0x000A0033: Connection.CloseOk,
0x000A003C: Connection.Blocked,
0x000A003D: Connection.Unblocked,
0x0014000A: Channel.Open,
0x0014000B: Channel.OpenOk,
0x00140014: Channel.Flow,
Expand Down