Skip to content

Commit

Permalink
hive-messaging: Add channel-open callbacks
Browse files Browse the repository at this point in the history
  • Loading branch information
gbenson committed Sep 24, 2024
1 parent ca67753 commit 6eedf6a
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 1 deletion.
6 changes: 6 additions & 0 deletions libs/messaging/hive/messaging/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@


class Connection(WrappedPikaThing):
def __init__(self, *args, **kwargs):
self.on_channel_open = kwargs.pop("on_channel_open", None)
super().__init__(*args, **kwargs)

def __enter__(self):
return self

Expand All @@ -20,4 +24,6 @@ def channel(self, *args, **kwargs):
channel = Channel(self._pika.channel(*args, **kwargs))
if confirm_delivery:
channel.confirm_delivery() # Don't fail silently.
if self.on_channel_open:
self.on_channel_open(channel)
return channel
6 changes: 5 additions & 1 deletion libs/messaging/hive/messaging/message_bus.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,13 @@ def queue_connection_parameters(
)

def blocking_connection(self, **kwargs) -> Connection:
on_channel_open = kwargs.pop("on_channel_open", None)
params = self.queue_connection_parameters(**kwargs)
try:
return Connection(BlockingConnection(params))
return Connection(
BlockingConnection(params),
on_channel_open=on_channel_open,
)
except AMQPConnectionError as e:
e = getattr(e, "args", [None])[0]
e = getattr(e, "exception", None)
Expand Down

0 comments on commit 6eedf6a

Please sign in to comment.