diff --git a/libs/messaging/hive/messaging/connection.py b/libs/messaging/hive/messaging/connection.py index 91116cf..771c9de 100644 --- a/libs/messaging/hive/messaging/connection.py +++ b/libs/messaging/hive/messaging/connection.py @@ -3,6 +3,10 @@ class Connection(WrappedPikaThing): + def __init__(self, *args, **kwargs): + self.on_channel_open = kwargs.pop("on_channel_open", None) + super().__init__(*args, **kwargs) + def __enter__(self): return self @@ -20,4 +24,6 @@ def channel(self, *args, **kwargs): channel = Channel(self._pika.channel(*args, **kwargs)) if confirm_delivery: channel.confirm_delivery() # Don't fail silently. + if self.on_channel_open: + self.on_channel_open(channel) return channel diff --git a/libs/messaging/hive/messaging/message_bus.py b/libs/messaging/hive/messaging/message_bus.py index 684adca..bb5f7fa 100644 --- a/libs/messaging/hive/messaging/message_bus.py +++ b/libs/messaging/hive/messaging/message_bus.py @@ -71,9 +71,13 @@ def queue_connection_parameters( ) def blocking_connection(self, **kwargs) -> Connection: + on_channel_open = kwargs.pop("on_channel_open", None) params = self.queue_connection_parameters(**kwargs) try: - return Connection(BlockingConnection(params)) + return Connection( + BlockingConnection(params), + on_channel_open=on_channel_open, + ) except AMQPConnectionError as e: e = getattr(e, "args", [None])[0] e = getattr(e, "exception", None)