diff --git a/jupyter_client/threaded.py b/jupyter_client/threaded.py index 6c46219d0..9433ea859 100644 --- a/jupyter_client/threaded.py +++ b/jupyter_client/threaded.py @@ -1,18 +1,15 @@ """ Defines a KernelClient that provides thread-safe sockets with async callbacks on message replies. """ -import asyncio import atexit import errno import time from threading import Event from threading import Thread from typing import Any -from typing import Awaitable from typing import Dict from typing import List from typing import Optional -from typing import Union import zmq from traitlets import Instance @@ -30,10 +27,6 @@ # during garbage collection of threads at exit -async def get_msg(msg: Awaitable) -> Union[List[bytes], List[zmq.Message]]: - return await msg - - class ThreadedZMQSocketChannel(object): """A ZMQ socket invoking a callback in the ioloop""" @@ -70,7 +63,7 @@ def __init__( def setup_stream(): assert self.socket is not None self.stream = zmqstream.ZMQStream(self.socket, self.ioloop) - self.stream.on_recv(self._handle_recv) # type:ignore[arg-type] + self.stream.on_recv(self._handle_recv) evt.set() assert self.ioloop is not None @@ -114,13 +107,11 @@ def thread_send(): assert self.ioloop is not None self.ioloop.add_callback(thread_send) - def _handle_recv(self, future_msg: Awaitable) -> None: + def _handle_recv(self, msg_list: List[bytes]) -> None: """Callback for stream.on_recv. Unpacks message, and calls handlers with it. """ - assert self.ioloop is not None - msg_list = self.ioloop._asyncio_event_loop.run_until_complete(get_msg(future_msg)) assert self.session is not None ident, smsg = self.session.feed_identities(msg_list) msg = self.session.deserialize(smsg) @@ -209,10 +200,7 @@ def start(self) -> None: def run(self) -> None: """Run my loop, ignoring EINTR events in the poller""" - loop = asyncio.new_event_loop() - asyncio.set_event_loop(loop) self.ioloop = ioloop.IOLoop() - self.ioloop._asyncio_event_loop = loop # signal that self.ioloop is defined self._start_event.set() while True: