Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid running nested runloops in ThreadedZMQSocketChannel #831

Closed
wants to merge 3 commits into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 2 additions & 14 deletions jupyter_client/threaded.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,15 @@
""" Defines a KernelClient that provides thread-safe sockets with async callbacks on message
replies.
"""
import asyncio
import atexit
import errno
import time
from threading import Event
from threading import Thread
from typing import Any
from typing import Awaitable
from typing import Dict
from typing import List
from typing import Optional
from typing import Union

import zmq
from traitlets import Instance
Expand All @@ -30,10 +27,6 @@
# during garbage collection of threads at exit


async def get_msg(msg: Awaitable) -> Union[List[bytes], List[zmq.Message]]:
return await msg


class ThreadedZMQSocketChannel(object):
"""A ZMQ socket invoking a callback in the ioloop"""

Expand Down Expand Up @@ -70,7 +63,7 @@ def __init__(
def setup_stream():
assert self.socket is not None
self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
self.stream.on_recv(self._handle_recv) # type:ignore[arg-type]
self.stream.on_recv(self._handle_recv)
evt.set()

assert self.ioloop is not None
Expand Down Expand Up @@ -114,13 +107,11 @@ def thread_send():
assert self.ioloop is not None
self.ioloop.add_callback(thread_send)

def _handle_recv(self, future_msg: Awaitable) -> None:
def _handle_recv(self, msg_list: List[bytes]) -> None:
"""Callback for stream.on_recv.

Unpacks message, and calls handlers with it.
"""
assert self.ioloop is not None
msg_list = self.ioloop._asyncio_event_loop.run_until_complete(get_msg(future_msg))
assert self.session is not None
ident, smsg = self.session.feed_identities(msg_list)
msg = self.session.deserialize(smsg)
Expand Down Expand Up @@ -209,10 +200,7 @@ def start(self) -> None:

def run(self) -> None:
"""Run my loop, ignoring EINTR events in the poller"""
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
self.ioloop = ioloop.IOLoop()
self.ioloop._asyncio_event_loop = loop
# signal that self.ioloop is defined
self._start_event.set()
while True:
Expand Down