diff --git a/CHANGELOG.md b/CHANGELOG.md index 4a99eb2..6cbf9be 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - New extra install `-E websockets`, additionally a convenience `-E all` option - `context_hook` in Base Manager that can be used to bind structlog contextvars for all workers (inbound, outbound, poll) - `connect_hook` and `disconnect_hook` for Websocket manager +- `callback_hook` that takes in `QueuedMessage` and `Callback`, useful for adding things like topic to contextvars for callback functions ### Changed - Use `managed_service_fixtures` for Redis tests diff --git a/sending/base.py b/sending/base.py index 97f7b41..ccf9dbe 100644 --- a/sending/base.py +++ b/sending/base.py @@ -68,15 +68,25 @@ def __init__(self): if not hasattr(self, "inbound_message_hook"): # Called by _inbound_worker when picking up a message from inbound queue # Primarily used for deserializing messages from the wire + # Will get one argument: incoming "raw" message content over the wire self.inbound_message_hook: Optional[Callable] = None if not hasattr(self, "outbound_message_hook"): # Called by _outbound_worker before pushing a message to _publish # Primarily used for serializing messages going out over the wire + # Will get one argument, the QueuedMessage.contents coming out of .send() self.outbound_message_hook: Optional[Callable] = None if not hasattr(self, "context_hook"): # Called at .initialize() and then within the while True loop for # each worker. Should be used to set structlog.contextvars.bind_contextvars. + # Takes no arguments self.context_hook: Optional[Callable] = None + if not hasattr(self, "callback_hook"): + # Called when the inbound worker is about to pass a QueuedMessage.contents + # into a callback. Primarily used for logging the QueuedMessage.topic as + # contextvars in the callback logging. Can be removed if we begin passing + # kwargs / full QueuedMessage to callbacks. + # Takses two arguments: the QueuedMessage and the Callback + self.callback_hook: Optional[Callable] = None async def initialize( self, @@ -343,6 +353,8 @@ async def _delegate_to_callback(self, message: QueuedMessage, callback_id: UUID) # TODO(nick): I would love to have a set of kwargs that are passed around # for callbacks + predicates that you opt-in to. That would be a bit easier # to document and access. + if self.callback_hook: + await self.callback_hook(message, cb) await cb.method(message.contents) else: logger.debug( diff --git a/tests/test_websocket_backend.py b/tests/test_websocket_backend.py index 21bf110..8adba0f 100644 --- a/tests/test_websocket_backend.py +++ b/tests/test_websocket_backend.py @@ -9,7 +9,7 @@ from managed_service_fixtures import AppDetails, AppManager from sending.backends.websocket import WebsocketManager -from sending.base import QueuedMessage +from sending.base import Callback, QueuedMessage @pytest.fixture(scope="session") @@ -233,6 +233,11 @@ def __init__(self, ws_url): async def context_hook(self): structlog.contextvars.bind_contextvars(session_id=self.session_id) + async def callback_hook(self, message: QueuedMessage, callback: Callback): + structlog.contextvars.bind_contextvars( + topic=message.topic, callback_name=callback.qualname + ) + async def connect_hook(self, mgr): ws = await self.unauth_ws self.session_id = ws.response_headers.get("session_id") @@ -267,6 +272,11 @@ async def log_received(self, message: dict): assert receive_log["event"] == "Received {'type': 'unauthed_echo_reply', 'text': 'Hello 1'}" assert receive_log["session_id"] assert receive_log["func_name"] == "log_received" + assert receive_log["topic"] == "" + assert ( + receive_log["callback_name"] + == "test_structlog_contextvars_worker_hook..Sub.log_received" + ) await mgr.shutdown()