Skip to content

Commit

Permalink
callback hook
Browse files Browse the repository at this point in the history
  • Loading branch information
Kafonek committed Sep 30, 2022
1 parent 9f0860e commit 6ab6522
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 1 deletion.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- New extra install `-E websockets`, additionally a convenience `-E all` option
- `context_hook` in Base Manager that can be used to bind structlog contextvars for all workers (inbound, outbound, poll)
- `connect_hook` and `disconnect_hook` for Websocket manager
- `callback_hook` that takes in `QueuedMessage` and `Callback`, useful for adding things like topic to contextvars for callback functions

### Changed
- Use `managed_service_fixtures` for Redis tests
Expand Down
12 changes: 12 additions & 0 deletions sending/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,15 +68,25 @@ def __init__(self):
if not hasattr(self, "inbound_message_hook"):
# Called by _inbound_worker when picking up a message from inbound queue
# Primarily used for deserializing messages from the wire
# Will get one argument: incoming "raw" message content over the wire
self.inbound_message_hook: Optional[Callable] = None
if not hasattr(self, "outbound_message_hook"):
# Called by _outbound_worker before pushing a message to _publish
# Primarily used for serializing messages going out over the wire
# Will get one argument, the QueuedMessage.contents coming out of .send()
self.outbound_message_hook: Optional[Callable] = None
if not hasattr(self, "context_hook"):
# Called at .initialize() and then within the while True loop for
# each worker. Should be used to set structlog.contextvars.bind_contextvars.
# Takes no arguments
self.context_hook: Optional[Callable] = None
if not hasattr(self, "callback_hook"):
# Called when the inbound worker is about to pass a QueuedMessage.contents
# into a callback. Primarily used for logging the QueuedMessage.topic as
# contextvars in the callback logging. Can be removed if we begin passing
# kwargs / full QueuedMessage to callbacks.
# Takses two arguments: the QueuedMessage and the Callback
self.callback_hook: Optional[Callable] = None

async def initialize(
self,
Expand Down Expand Up @@ -343,6 +353,8 @@ async def _delegate_to_callback(self, message: QueuedMessage, callback_id: UUID)
# TODO(nick): I would love to have a set of kwargs that are passed around
# for callbacks + predicates that you opt-in to. That would be a bit easier
# to document and access.
if self.callback_hook:
await self.callback_hook(message, cb)
await cb.method(message.contents)
else:
logger.debug(
Expand Down
12 changes: 11 additions & 1 deletion tests/test_websocket_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from managed_service_fixtures import AppDetails, AppManager

from sending.backends.websocket import WebsocketManager
from sending.base import QueuedMessage
from sending.base import Callback, QueuedMessage


@pytest.fixture(scope="session")
Expand Down Expand Up @@ -233,6 +233,11 @@ def __init__(self, ws_url):
async def context_hook(self):
structlog.contextvars.bind_contextvars(session_id=self.session_id)

async def callback_hook(self, message: QueuedMessage, callback: Callback):
structlog.contextvars.bind_contextvars(
topic=message.topic, callback_name=callback.qualname
)

async def connect_hook(self, mgr):
ws = await self.unauth_ws
self.session_id = ws.response_headers.get("session_id")
Expand Down Expand Up @@ -267,6 +272,11 @@ async def log_received(self, message: dict):
assert receive_log["event"] == "Received {'type': 'unauthed_echo_reply', 'text': 'Hello 1'}"
assert receive_log["session_id"]
assert receive_log["func_name"] == "log_received"
assert receive_log["topic"] == ""
assert (
receive_log["callback_name"]
== "test_structlog_contextvars_worker_hook.<locals>.Sub.log_received"
)

await mgr.shutdown()

Expand Down

0 comments on commit 6ab6522

Please sign in to comment.