Skip to content

Commit

Permalink
comments and docstrings
Browse files Browse the repository at this point in the history
  • Loading branch information
Kafonek committed Sep 30, 2022
1 parent 1766aa1 commit 9f0860e
Showing 1 changed file with 25 additions and 3 deletions.
28 changes: 25 additions & 3 deletions sending/backends/jupyter.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,20 +57,40 @@ def set_context_option(self, option: int, val: Union[int, bytes]):
self._context.setsockopt(option, val)

async def watch_for_channel_messages(self, topic_name: str, channel_obj: ZMQSocketChannel):
"""
Read in any messages on a specific jupyter_client channel and drop them into the inbound
worker queue which will trigger registered callback functions by predicate / topic
"""
while True:
msg: dict = await channel_obj.get_msg()
print(msg)
self.schedule_for_delivery(topic_name, msg)

async def watch_for_disconnect(self, monitor_socket: zmq.Socket):
"""
An awaitable task that ends when a particular socket has a disconnect event. Used in
conjunction with watch_for_channel_messages to cycle a socket when it's disconnected.
"""
while True:
msg: dict = await recv_monitor_message(monitor_socket)
event: zmq.Event = msg["event"]
if event == zmq.EVENT_DISCONNECTED:
return

async def watch_channel(self, topic_name: str):
"""
When a user subscribes to a topic (mgr.subscribe_to_topic('iopub')), this function starts
two tasks:
1. Pull messages off the zmq channel and trigger any registered callbacks
2. Watch the monitor socket for disconnect events and reconnect / restart tasks
"""
channel_name = f"{topic_name}_channel"

# context_hook (primarily for adding structlog contextvars) is normally called in base
# _poll_loop, so that it's applied to every read the poll worker does. For the Jupyter
# implementation, we don't use _poll_loop, all reads from zmq start with tasks here,
# and any contextvars set in this method will be picked up by tasks created here.
if self.context_hook:
await self.context_hook()
while True:
# The channel properties (e.g. self._client.iopub_channel) will connect the socket
# if self._client._iopub_channel is None.
Expand Down Expand Up @@ -101,7 +121,6 @@ async def watch_channel(self, topic_name: str):
self.channel_tasks[topic_name].remove(monitor_task)
self.channel_tasks[topic_name].remove(message_task)
logger.info(f"Cycling topic {topic_name} after disconnect")
print(f"Cycling topic {topic_name} after disconnect")
self._emit_system_event(topic_name, SystemEvents.FORCED_DISCONNECT)
channel_obj.close()
setattr(self._client, f"_{channel_name}", None)
Expand All @@ -111,7 +130,6 @@ async def _create_topic_subscription(self, topic_name: str):
self.channel_tasks[topic_name].append(task)

async def _cleanup_topic_subscription(self, topic_name: str):
print(f"Cleaning up topic {topic_name}")
if topic_name in self.channel_tasks:
for task in self.channel_tasks[topic_name]:
task.cancel()
Expand Down Expand Up @@ -144,6 +162,10 @@ async def _publish(self, message: QueuedMessage):
channel_obj = getattr(self._client, f"{topic_name}_channel")
channel_obj.send(message.contents)

# _poll and _poll_loop are designed to be used to define how a Sending backend
# will read incoming data over the wire (socket, websocket, etc). In this implementation
# when we subscribe to a topic, it starts a watch_channel task which handles reading
# data over the right jupyter_client / zmq channel. So _poll and _poll_loop aren't used.
async def _poll(self):
pass

Expand Down

0 comments on commit 9f0860e

Please sign in to comment.