From 1fe492aec41d52f4fc343ffdc7469167a5f41ae1 Mon Sep 17 00:00:00 2001 From: David Brochart Date: Thu, 7 Nov 2024 14:57:12 +0100 Subject: [PATCH 01/26] Use zmq-anyio --- ipykernel/debugger.py | 4 +- ipykernel/inprocess/ipkernel.py | 2 +- ipykernel/inprocess/session.py | 2 +- ipykernel/iostream.py | 72 +++++++++++----------- ipykernel/ipkernel.py | 9 +-- ipykernel/kernelapp.py | 59 ++++-------------- ipykernel/kernelbase.py | 103 ++++++++++++++++++-------------- ipykernel/shellchannel.py | 12 +++- ipykernel/subshell.py | 14 +++-- ipykernel/subshell_manager.py | 56 +++++++++++------ ipykernel/thread.py | 5 ++ pyproject.toml | 3 +- tests/conftest.py | 25 +++----- tests/test_async.py | 16 ++--- tests/test_io.py | 97 ++++++++++++++++-------------- 15 files changed, 244 insertions(+), 235 deletions(-) diff --git a/ipykernel/debugger.py b/ipykernel/debugger.py index 780d1801..36aced05 100644 --- a/ipykernel/debugger.py +++ b/ipykernel/debugger.py @@ -241,7 +241,7 @@ async def _send_request(self, msg): self.log.debug("DEBUGPYCLIENT:") self.log.debug(self.routing_id) self.log.debug(buf) - await self.debugpy_socket.send_multipart((self.routing_id, buf)) + await self.debugpy_socket.asend_multipart((self.routing_id, buf)) async def _wait_for_response(self): # Since events are never pushed to the message_queue @@ -437,7 +437,7 @@ async def start(self): (self.shell_socket.getsockopt(ROUTING_ID)), ) - msg = await self.shell_socket.recv_multipart() + msg = await self.shell_socket.arecv_multipart() ident, msg = self.session.feed_identities(msg, copy=True) try: msg = self.session.deserialize(msg, content=True, copy=True) diff --git a/ipykernel/inprocess/ipkernel.py b/ipykernel/inprocess/ipkernel.py index c6f8c612..5abb691c 100644 --- a/ipykernel/inprocess/ipkernel.py +++ b/ipykernel/inprocess/ipkernel.py @@ -54,7 +54,7 @@ class InProcessKernel(IPythonKernel): _underlying_iopub_socket = Instance(DummySocket, (False,)) iopub_thread: IOPubThread = Instance(IOPubThread) # type:ignore[assignment] - shell_socket = Instance(DummySocket, (True,)) # type:ignore[arg-type] + shell_socket = Instance(DummySocket, (True,)) @default("iopub_thread") def _default_iopub_thread(self): diff --git a/ipykernel/inprocess/session.py b/ipykernel/inprocess/session.py index 0eaed2c6..70b13574 100644 --- a/ipykernel/inprocess/session.py +++ b/ipykernel/inprocess/session.py @@ -3,7 +3,7 @@ class Session(_Session): async def recv(self, socket, copy=True): - return await socket.recv_multipart() + return await socket.arecv_multipart() def send( self, diff --git a/ipykernel/iostream.py b/ipykernel/iostream.py index d8171017..19334212 100644 --- a/ipykernel/iostream.py +++ b/ipykernel/iostream.py @@ -20,6 +20,7 @@ from typing import Any, Callable import zmq +import zmq_anyio from anyio import create_task_group, run, sleep, to_thread from jupyter_client.session import extract_header @@ -55,11 +56,11 @@ def run(self): run(self._main) async def _main(self): - async with create_task_group() as tg: + async with create_task_group() as self._task_group: for task in self._tasks: - tg.start_soon(task) + self._task_group.start_soon(task) await to_thread.run_sync(self.__stop.wait) - tg.cancel_scope.cancel() + self._task_group.cancel_scope.cancel() def stop(self): """Stop the thread. @@ -78,7 +79,7 @@ class IOPubThread: whose IO is always run in a thread. """ - def __init__(self, socket, pipe=False): + def __init__(self, socket: zmq_anyio.Socket, pipe=False): """Create IOPub thread Parameters @@ -91,10 +92,7 @@ def __init__(self, socket, pipe=False): """ # ensure all of our sockets as sync zmq.Sockets # don't create async wrappers until we are within the appropriate coroutines - self.socket: zmq.Socket[bytes] | None = zmq.Socket(socket) - if self.socket.context is None: - # bug in pyzmq, shadow socket doesn't always inherit context attribute - self.socket.context = socket.context # type:ignore[unreachable] + self.socket: zmq_anyio.Socket = socket self._context = socket.context self.background_socket = BackgroundSocket(self) @@ -108,14 +106,14 @@ def __init__(self, socket, pipe=False): self._event_pipe_gc_lock: threading.Lock = threading.Lock() self._event_pipe_gc_seconds: float = 10 self._setup_event_pipe() - tasks = [self._handle_event, self._run_event_pipe_gc] + tasks = [self._handle_event, self._run_event_pipe_gc, self.socket.start] if pipe: tasks.append(self._handle_pipe_msgs) self.thread = _IOPubThread(tasks) def _setup_event_pipe(self): """Create the PULL socket listening for events that should fire in this thread.""" - self._pipe_in0 = self._context.socket(zmq.PULL, socket_class=zmq.Socket) + self._pipe_in0 = self._context.socket(zmq.PULL) self._pipe_in0.linger = 0 _uuid = b2a_hex(os.urandom(16)).decode("ascii") @@ -150,7 +148,7 @@ def _event_pipe(self): except AttributeError: # new thread, new event pipe # create sync base socket - event_pipe = self._context.socket(zmq.PUSH, socket_class=zmq.Socket) + event_pipe = self._context.socket(zmq.PUSH) event_pipe.linger = 0 event_pipe.connect(self._event_interface) self._local.event_pipe = event_pipe @@ -169,30 +167,28 @@ async def _handle_event(self): Whenever *an* event arrives on the event stream, *all* waiting events are processed in order. """ - # create async wrapper within coroutine - pipe_in = zmq.asyncio.Socket(self._pipe_in0) - try: - while True: - await pipe_in.recv() - # freeze event count so new writes don't extend the queue - # while we are processing - n_events = len(self._events) - for _ in range(n_events): - event_f = self._events.popleft() - event_f() - except Exception: - if self.thread.__stop.is_set(): - return - raise + pipe_in = zmq_anyio.Socket(self._pipe_in0) + async with pipe_in: + try: + while True: + await pipe_in.arecv() + # freeze event count so new writes don't extend the queue + # while we are processing + n_events = len(self._events) + for _ in range(n_events): + event_f = self._events.popleft() + event_f() + except Exception: + if self.thread.__stop.is_set(): + return + raise def _setup_pipe_in(self): """setup listening pipe for IOPub from forked subprocesses""" - ctx = self._context - # use UUID to authenticate pipe messages self._pipe_uuid = os.urandom(16) - self._pipe_in1 = ctx.socket(zmq.PULL, socket_class=zmq.Socket) + self._pipe_in1 = zmq_anyio.Socket(self._context.socket(zmq.PULL)) self._pipe_in1.linger = 0 try: @@ -210,18 +206,18 @@ def _setup_pipe_in(self): async def _handle_pipe_msgs(self): """handle pipe messages from a subprocess""" # create async wrapper within coroutine - self._async_pipe_in1 = zmq.asyncio.Socket(self._pipe_in1) - try: - while True: - await self._handle_pipe_msg() - except Exception: - if self.thread.__stop.is_set(): - return - raise + async with self._pipe_in1: + try: + while True: + await self._handle_pipe_msg() + except Exception: + if self.thread.__stop.is_set(): + return + raise async def _handle_pipe_msg(self, msg=None): """handle a pipe message from a subprocess""" - msg = msg or await self._async_pipe_in1.recv_multipart() + msg = msg or await self._pipe_in1.arecv_multipart() if not self._pipe_flag or not self._is_main_process(): return if msg[0] != self._pipe_uuid: diff --git a/ipykernel/ipkernel.py b/ipykernel/ipkernel.py index 48efa6cd..d8d2ba5d 100644 --- a/ipykernel/ipkernel.py +++ b/ipykernel/ipkernel.py @@ -12,7 +12,7 @@ from dataclasses import dataclass import comm -import zmq.asyncio +import zmq_anyio from anyio import TASK_STATUS_IGNORED, create_task_group, to_thread from anyio.abc import TaskStatus from IPython.core import release @@ -76,7 +76,7 @@ class IPythonKernel(KernelBase): help="Set this flag to False to deactivate the use of experimental IPython completion APIs.", ).tag(config=True) - debugpy_socket = Instance(zmq.asyncio.Socket, allow_none=True) + debugpy_socket = Instance(zmq_anyio.Socket, allow_none=True) user_module = Any() @@ -212,7 +212,8 @@ def __init__(self, **kwargs): } async def process_debugpy(self): - async with create_task_group() as tg: + assert self.debugpy_socket is not None + async with self.debug_shell_socket, self.debugpy_socket, create_task_group() as tg: tg.start_soon(self.receive_debugpy_messages) tg.start_soon(self.poll_stopped_queue) await to_thread.run_sync(self.debugpy_stop.wait) @@ -235,7 +236,7 @@ async def receive_debugpy_message(self, msg=None): if msg is None: assert self.debugpy_socket is not None - msg = await self.debugpy_socket.recv_multipart() + msg = await self.debugpy_socket.arecv_multipart() # The first frame is the socket id, we can drop it frame = msg[1].decode("utf-8") self.log.debug("Debugpy received: %s", frame) diff --git a/ipykernel/kernelapp.py b/ipykernel/kernelapp.py index 55efaa8e..1cf5697b 100644 --- a/ipykernel/kernelapp.py +++ b/ipykernel/kernelapp.py @@ -18,7 +18,7 @@ from pathlib import Path import zmq -import zmq.asyncio +import zmq_anyio from anyio import create_task_group, run from IPython.core.application import ( # type:ignore[attr-defined] BaseIPythonApplication, @@ -325,15 +325,15 @@ def init_sockets(self): """Create a context, a session, and the kernel sockets.""" self.log.info("Starting the kernel at pid: %i", os.getpid()) assert self.context is None, "init_sockets cannot be called twice!" - self.context = context = zmq.asyncio.Context() + self.context = context = zmq.Context() atexit.register(self.close) - self.shell_socket = context.socket(zmq.ROUTER) + self.shell_socket = zmq_anyio.Socket(context.socket(zmq.ROUTER)) self.shell_socket.linger = 1000 self.shell_port = self._bind_socket(self.shell_socket, self.shell_port) self.log.debug("shell ROUTER Channel on port: %i" % self.shell_port) - self.stdin_socket = zmq.Context(context).socket(zmq.ROUTER) + self.stdin_socket = context.socket(zmq.ROUTER) self.stdin_socket.linger = 1000 self.stdin_port = self._bind_socket(self.stdin_socket, self.stdin_port) self.log.debug("stdin ROUTER Channel on port: %i" % self.stdin_port) @@ -349,18 +349,19 @@ def init_sockets(self): def init_control(self, context): """Initialize the control channel.""" - self.control_socket = context.socket(zmq.ROUTER) + self.control_socket = zmq_anyio.Socket(context.socket(zmq.ROUTER)) self.control_socket.linger = 1000 self.control_port = self._bind_socket(self.control_socket, self.control_port) self.log.debug("control ROUTER Channel on port: %i" % self.control_port) - self.debugpy_socket = context.socket(zmq.STREAM) + self.debugpy_socket = zmq_anyio.Socket(context, zmq.STREAM) self.debugpy_socket.linger = 1000 - self.debug_shell_socket = context.socket(zmq.DEALER) + self.debug_shell_socket = zmq_anyio.Socket(context.socket(zmq.DEALER)) self.debug_shell_socket.linger = 1000 - if self.shell_socket.getsockopt(zmq.LAST_ENDPOINT): - self.debug_shell_socket.connect(self.shell_socket.getsockopt(zmq.LAST_ENDPOINT)) + last_endpoint = self.shell_socket.getsockopt(zmq.LAST_ENDPOINT) + if last_endpoint: + self.debug_shell_socket.connect(last_endpoint) if hasattr(zmq, "ROUTER_HANDOVER"): # set router-handover to workaround zeromq reconnect problems @@ -373,7 +374,7 @@ def init_control(self, context): def init_iopub(self, context): """Initialize the iopub channel.""" - self.iopub_socket = context.socket(zmq.PUB) + self.iopub_socket = zmq_anyio.Socket(context.socket(zmq.PUB)) self.iopub_socket.linger = 1000 self.iopub_port = self._bind_socket(self.iopub_socket, self.iopub_port) self.log.debug("iopub PUB Channel on port: %i" % self.iopub_port) @@ -634,43 +635,6 @@ def configure_tornado_logger(self): handler.setFormatter(formatter) logger.addHandler(handler) - def _init_asyncio_patch(self): - """set default asyncio policy to be compatible with tornado - - Tornado 6 (at least) is not compatible with the default - asyncio implementation on Windows - - Pick the older SelectorEventLoopPolicy on Windows - if the known-incompatible default policy is in use. - - Support for Proactor via a background thread is available in tornado 6.1, - but it is still preferable to run the Selector in the main thread - instead of the background. - - do this as early as possible to make it a low priority and overridable - - ref: https://github.com/tornadoweb/tornado/issues/2608 - - FIXME: if/when tornado supports the defaults in asyncio without threads, - remove and bump tornado requirement for py38. - Most likely, this will mean a new Python version - where asyncio.ProactorEventLoop supports add_reader and friends. - - """ - if sys.platform.startswith("win"): - import asyncio - - try: - from asyncio import WindowsProactorEventLoopPolicy, WindowsSelectorEventLoopPolicy - except ImportError: - pass - # not affected - else: - if type(asyncio.get_event_loop_policy()) is WindowsProactorEventLoopPolicy: - # WindowsProactorEventLoopPolicy is not compatible with tornado 6 - # fallback to the pre-3.8 default of Selector - asyncio.set_event_loop_policy(WindowsSelectorEventLoopPolicy()) - def init_pdb(self): """Replace pdb with IPython's version that is interruptible. @@ -690,7 +654,6 @@ def init_pdb(self): @catch_config_error def initialize(self, argv=None): """Initialize the application.""" - self._init_asyncio_patch() super().initialize(argv) if self.subapp is not None: return diff --git a/ipykernel/kernelbase.py b/ipykernel/kernelbase.py index d496e0c9..3c7324d2 100644 --- a/ipykernel/kernelbase.py +++ b/ipykernel/kernelbase.py @@ -35,6 +35,7 @@ import psutil import zmq +import zmq_anyio from anyio import TASK_STATUS_IGNORED, Event, create_task_group, sleep, to_thread from anyio.abc import TaskStatus from IPython.core.error import StdinNotImplementedError @@ -88,7 +89,7 @@ class Kernel(SingletonConfigurable): session = Instance(Session, allow_none=True) profile_dir = Instance("IPython.core.profiledir.ProfileDir", allow_none=True) - shell_socket = Instance(zmq.asyncio.Socket, allow_none=True) + shell_socket = Instance(zmq_anyio.Socket, allow_none=True) implementation: str implementation_version: str @@ -96,7 +97,7 @@ class Kernel(SingletonConfigurable): _is_test = Bool(False) - control_socket = Instance(zmq.asyncio.Socket, allow_none=True) + control_socket = Instance(zmq_anyio.Socket, allow_none=True) control_tasks: t.Any = List() debug_shell_socket = Any() @@ -267,7 +268,7 @@ async def process_control_message(self, msg=None): assert self.session is not None assert self.control_thread is None or threading.current_thread() == self.control_thread - msg = msg or await self.control_socket.recv_multipart() + msg = msg or await self.control_socket.arecv_multipart() idents, msg = self.session.feed_identities(msg) try: msg = self.session.deserialize(msg, content=True) @@ -364,26 +365,29 @@ async def shell_channel_thread_main(self): assert self.shell_channel_thread is not None assert threading.current_thread() == self.shell_channel_thread - try: - while True: - msg = await self.shell_socket.recv_multipart(copy=False) - # deserialize only the header to get subshell_id - # Keep original message to send to subshell_id unmodified. - _, msg2 = self.session.feed_identities(msg, copy=False) - try: - msg3 = self.session.deserialize(msg2, content=False, copy=False) - subshell_id = msg3["header"].get("subshell_id") - - # Find inproc pair socket to use to send message to correct subshell. - socket = self.shell_channel_thread.manager.get_shell_channel_socket(subshell_id) - assert socket is not None - socket.send_multipart(msg, copy=False) - except Exception: - self.log.error("Invalid message", exc_info=True) # noqa: G201 - except BaseException: - if self.shell_stop.is_set(): - return - raise + async with self.shell_socket: + try: + while True: + msg = await self.shell_socket.arecv_multipart(copy=False) + # deserialize only the header to get subshell_id + # Keep original message to send to subshell_id unmodified. + _, msg2 = self.session.feed_identities(msg, copy=False) + try: + msg3 = self.session.deserialize(msg2, content=False, copy=False) + subshell_id = msg3["header"].get("subshell_id") + + # Find inproc pair socket to use to send message to correct subshell. + socket = self.shell_channel_thread.manager.get_shell_channel_socket( + subshell_id + ) + assert socket is not None + socket.send_multipart(msg, copy=False) + except Exception: + self.log.error("Invalid message", exc_info=True) # noqa: G201 + except BaseException: + if self.shell_stop.is_set(): + return + raise async def shell_main(self, subshell_id: str | None): """Main loop for a single subshell.""" @@ -411,13 +415,15 @@ async def shell_main(self, subshell_id: str | None): async def process_shell(self, socket=None): # socket=None is valid if kernel subshells are not supported. - try: - while True: - await self.process_shell_message(socket=socket) - except BaseException: - if self.shell_stop.is_set(): - return - raise + _socket = self.shell_socket if socket is None else socket + async with _socket: + try: + while True: + await self.process_shell_message(socket=socket) + except BaseException: + if self.shell_stop.is_set(): + return + raise async def process_shell_message(self, msg=None, socket=None): # If socket is None kernel subshells are not supported so use socket=shell_socket. @@ -435,8 +441,8 @@ async def process_shell_message(self, msg=None, socket=None): assert socket is None socket = self.shell_socket - no_msg = msg is None if self._is_test else not await socket.poll(0) - msg = msg or await socket.recv_multipart(copy=False) + no_msg = msg is None if self._is_test else not await socket.apoll(0) + msg = msg or await socket.arecv_multipart(copy=False) received_time = time.monotonic() copy = not isinstance(msg[0], zmq.Message) @@ -490,7 +496,7 @@ async def process_shell_message(self, msg=None, socket=None): try: result = handler(socket, idents, msg) if inspect.isawaitable(result): - await result + result = await result except Exception: self.log.error("Exception in message handler:", exc_info=True) # noqa: G201 except KeyboardInterrupt: @@ -509,7 +515,8 @@ async def process_shell_message(self, msg=None, socket=None): self._publish_status("idle", "shell") async def control_main(self): - async with create_task_group() as tg: + assert self.control_socket is not None + async with self.control_socket, create_task_group() as tg: for task in self.control_tasks: tg.start_soon(task) tg.start_soon(self.process_control) @@ -545,7 +552,9 @@ async def start(self, *, task_status: TaskStatus = TASK_STATUS_IGNORED) -> None: # Assign tasks to and start shell channel thread. manager = self.shell_channel_thread.manager self.shell_channel_thread.add_task(self.shell_channel_thread_main) - self.shell_channel_thread.add_task(manager.listen_from_control, self.shell_main) + self.shell_channel_thread.add_task( + manager.listen_from_control, self.shell_main, self.shell_channel_thread + ) self.shell_channel_thread.add_task(manager.listen_from_subshells) self.shell_channel_thread.start() else: @@ -1075,9 +1084,11 @@ async def create_subshell_request(self, socket, ident, parent) -> None: # This should only be called in the control thread if it exists. # Request is passed to shell channel thread to process. - other_socket = self.shell_channel_thread.manager.get_control_other_socket() - await other_socket.send_json({"type": "create"}) - reply = await other_socket.recv_json() + other_socket = await self.shell_channel_thread.manager.get_control_other_socket( + self.control_thread.get_task_group() + ) + await other_socket.asend_json({"type": "create"}) + reply = await other_socket.arecv_json() self.session.send(socket, "create_subshell_reply", reply, parent, ident) @@ -1097,9 +1108,11 @@ async def delete_subshell_request(self, socket, ident, parent) -> None: # This should only be called in the control thread if it exists. # Request is passed to shell channel thread to process. - other_socket = self.shell_channel_thread.manager.get_control_other_socket() - await other_socket.send_json({"type": "delete", "subshell_id": subshell_id}) - reply = await other_socket.recv_json() + other_socket = await self.shell_channel_thread.manager.get_control_other_socket( + self.control_thread.get_task_group() + ) + await other_socket.asend_json({"type": "delete", "subshell_id": subshell_id}) + reply = await other_socket.arecv_json() self.session.send(socket, "delete_subshell_reply", reply, parent, ident) @@ -1112,9 +1125,11 @@ async def list_subshell_request(self, socket, ident, parent) -> None: # This should only be called in the control thread if it exists. # Request is passed to shell channel thread to process. - other_socket = self.shell_channel_thread.manager.get_control_other_socket() - await other_socket.send_json({"type": "list"}) - reply = await other_socket.recv_json() + other_socket = await self.shell_channel_thread.manager.get_control_other_socket( + self.control_thread.get_task_group() + ) + await other_socket.asend_json({"type": "list"}) + reply = await other_socket.arecv_json() self.session.send(socket, "list_subshell_reply", reply, parent, ident) diff --git a/ipykernel/shellchannel.py b/ipykernel/shellchannel.py index bc0459c4..789a8875 100644 --- a/ipykernel/shellchannel.py +++ b/ipykernel/shellchannel.py @@ -1,5 +1,6 @@ """A thread for a shell channel.""" -import zmq.asyncio +import zmq +import zmq_anyio from .subshell_manager import SubshellManager from .thread import SHELL_CHANNEL_THREAD_NAME, BaseThread @@ -11,7 +12,12 @@ class ShellChannelThread(BaseThread): Communicates with shell/subshell threads via pairs of ZMQ inproc sockets. """ - def __init__(self, context: zmq.asyncio.Context, shell_socket: zmq.asyncio.Socket, **kwargs): + def __init__( + self, + context: zmq.Context, # type: ignore[type-arg] + shell_socket: zmq_anyio.Socket, + **kwargs, + ): """Initialize the thread.""" super().__init__(name=SHELL_CHANNEL_THREAD_NAME, **kwargs) self._manager: SubshellManager | None = None @@ -22,7 +28,7 @@ def __init__(self, context: zmq.asyncio.Context, shell_socket: zmq.asyncio.Socke def manager(self) -> SubshellManager: # Lazy initialisation. if self._manager is None: - self._manager = SubshellManager(self._context, self._shell_socket) + self._manager = SubshellManager(self._context, self._shell_socket, self.get_task_group) return self._manager def run(self) -> None: diff --git a/ipykernel/subshell.py b/ipykernel/subshell.py index 18e15ab3..e84f5498 100644 --- a/ipykernel/subshell.py +++ b/ipykernel/subshell.py @@ -2,7 +2,8 @@ from threading import current_thread -import zmq.asyncio +import zmq +import zmq_anyio from .thread import BaseThread @@ -15,17 +16,22 @@ def __init__(self, subshell_id: str, **kwargs): super().__init__(name=f"subshell-{subshell_id}", **kwargs) # Inproc PAIR socket, for communication with shell channel thread. - self._pair_socket: zmq.asyncio.Socket | None = None + self._pair_socket: zmq_anyio.Socket | None = None - async def create_pair_socket(self, context: zmq.asyncio.Context, address: str) -> None: + async def create_pair_socket( + self, + context: zmq.Context, # type: ignore[type-arg] + address: str, + ) -> None: """Create inproc PAIR socket, for communication with shell channel thread. Should be called from this thread, so usually via add_task before the thread is started. """ assert current_thread() == self - self._pair_socket = context.socket(zmq.PAIR) + self._pair_socket = zmq_anyio.Socket(context, zmq.PAIR) self._pair_socket.connect(address) + self.add_task(self._pair_socket.start) def run(self) -> None: try: diff --git a/ipykernel/subshell_manager.py b/ipykernel/subshell_manager.py index 805d6f81..dbd3da76 100644 --- a/ipykernel/subshell_manager.py +++ b/ipykernel/subshell_manager.py @@ -8,19 +8,21 @@ import uuid from dataclasses import dataclass from threading import Lock, current_thread, main_thread +from typing import Callable import zmq -import zmq.asyncio +import zmq_anyio from anyio import create_memory_object_stream, create_task_group +from anyio.abc import TaskGroup from .subshell import SubshellThread -from .thread import SHELL_CHANNEL_THREAD_NAME +from .thread import SHELL_CHANNEL_THREAD_NAME, BaseThread @dataclass class Subshell: thread: SubshellThread - shell_channel_socket: zmq.asyncio.Socket + shell_channel_socket: zmq_anyio.Socket class SubshellManager: @@ -38,11 +40,17 @@ class SubshellManager: against multiple subshells attempting to send at the same time. """ - def __init__(self, context: zmq.asyncio.Context, shell_socket: zmq.asyncio.Socket): + def __init__( + self, + context: zmq.Context, # type: ignore[type-arg] + shell_socket: zmq_anyio.Socket, + get_task_group: Callable[[], TaskGroup], + ): assert current_thread() == main_thread() - self._context: zmq.asyncio.Context = context + self._context: zmq.Context = context # type: ignore[type-arg] self._shell_socket = shell_socket + self._get_task_group = get_task_group self._cache: dict[str, Subshell] = {} self._lock_cache = Lock() self._lock_shell_socket = Lock() @@ -83,10 +91,13 @@ def close(self) -> None: break self._stop_subshell(subshell) - def get_control_other_socket(self) -> zmq.asyncio.Socket: + async def get_control_other_socket(self, task_group: TaskGroup) -> zmq_anyio.Socket: + if not self._control_other_socket.started.is_set(): + task_group.start_soon(self._control_other_socket.start) + await self._control_other_socket.started.wait() return self._control_other_socket - def get_other_socket(self, subshell_id: str | None) -> zmq.asyncio.Socket: + def get_other_socket(self, subshell_id: str | None) -> zmq_anyio.Socket: """Return the other inproc pair socket for a subshell. This socket is accessed from the subshell thread. @@ -98,7 +109,7 @@ def get_other_socket(self, subshell_id: str | None) -> zmq.asyncio.Socket: assert socket is not None return socket - def get_shell_channel_socket(self, subshell_id: str | None) -> zmq.asyncio.Socket: + def get_shell_channel_socket(self, subshell_id: str | None) -> zmq_anyio.Socket: """Return the shell channel inproc pair socket for a subshell. This socket is accessed from the shell channel thread. @@ -116,17 +127,20 @@ def list_subshell(self) -> list[str]: with self._lock_cache: return list(self._cache) - async def listen_from_control(self, subshell_task: t.Any) -> None: + async def listen_from_control(self, subshell_task: t.Any, thread: BaseThread) -> None: """Listen for messages on the control inproc socket, handle those messages and return replies on the same socket. Runs in the shell channel thread. """ assert current_thread().name == SHELL_CHANNEL_THREAD_NAME + if not self._control_shell_channel_socket.started.is_set(): + thread.get_task_group().start_soon(self._control_shell_channel_socket.start) + await self._control_shell_channel_socket.started.wait() socket = self._control_shell_channel_socket while True: - request = await socket.recv_json() # type: ignore[misc] + request = await socket.arecv_json() reply = await self._process_control_request(request, subshell_task) - await socket.send_json(reply) # type: ignore[func-returns-value] + await socket.asend_json(reply) async def listen_from_subshells(self) -> None: """Listen for reply messages on inproc sockets of all subshells and resend @@ -137,9 +151,9 @@ async def listen_from_subshells(self) -> None: assert current_thread().name == SHELL_CHANNEL_THREAD_NAME async with create_task_group() as tg: - tg.start_soon(self._listen_for_subshell_reply, None) + tg.start_soon(self._listen_for_subshell_reply, None, tg) async for subshell_id in self._receive_stream: - tg.start_soon(self._listen_for_subshell_reply, subshell_id) + tg.start_soon(self._listen_for_subshell_reply, subshell_id, tg) def subshell_id_from_thread_id(self, thread_id: int) -> str | None: """Return subshell_id of the specified thread_id. @@ -159,10 +173,10 @@ def subshell_id_from_thread_id(self, thread_id: int) -> str | None: def _create_inproc_pair_socket( self, name: str | None, shell_channel_end: bool - ) -> zmq.asyncio.Socket: + ) -> zmq_anyio.Socket: """Create and return a single ZMQ inproc pair socket.""" address = self._get_inproc_socket_address(name) - socket = self._context.socket(zmq.PAIR) + socket = zmq_anyio.Socket(self._context, zmq.PAIR) if shell_channel_end: socket.bind(address) else: @@ -208,7 +222,7 @@ def _get_inproc_socket_address(self, name: str | None) -> str: full_name = f"subshell-{name}" if name else "subshell" return f"inproc://{full_name}" - def _get_shell_channel_socket(self, subshell_id: str | None) -> zmq.asyncio.Socket: + def _get_shell_channel_socket(self, subshell_id: str | None) -> zmq_anyio.Socket: if subshell_id is None: return self._parent_shell_channel_socket with self._lock_cache: @@ -220,7 +234,9 @@ def _is_subshell(self, subshell_id: str | None) -> bool: with self._lock_cache: return subshell_id in self._cache - async def _listen_for_subshell_reply(self, subshell_id: str | None) -> None: + async def _listen_for_subshell_reply( + self, subshell_id: str | None, task_group: TaskGroup + ) -> None: """Listen for reply messages on specified subshell inproc socket and resend to the client via the shell_socket. @@ -230,11 +246,13 @@ async def _listen_for_subshell_reply(self, subshell_id: str | None) -> None: shell_channel_socket = self._get_shell_channel_socket(subshell_id) + task_group.start_soon(shell_channel_socket.start) + await shell_channel_socket.started.wait() try: while True: - msg = await shell_channel_socket.recv_multipart(copy=False) + msg = await shell_channel_socket.arecv_multipart(copy=False) with self._lock_shell_socket: - await self._shell_socket.send_multipart(msg) + await self._shell_socket.asend_multipart(msg) except BaseException: if not self._is_subshell(subshell_id): # Subshell no longer exists so exit gracefully diff --git a/ipykernel/thread.py b/ipykernel/thread.py index 40509ece..f55ee7c7 100644 --- a/ipykernel/thread.py +++ b/ipykernel/thread.py @@ -3,6 +3,7 @@ from threading import Event, Thread from anyio import create_task_group, run, to_thread +from anyio.abc import TaskGroup CONTROL_THREAD_NAME = "Control" SHELL_CHANNEL_THREAD_NAME = "Shell channel" @@ -19,6 +20,9 @@ def __init__(self, **kwargs): self.__stop = Event() self._tasks_and_args: list[tuple[t.Any, t.Any]] = [] + def get_task_group(self) -> TaskGroup: + return self._task_group + def add_task(self, task: t.Any, *args: t.Any) -> None: # May only add tasks before the thread is started. self._tasks_and_args.append((task, args)) @@ -29,6 +33,7 @@ def run(self) -> t.Any: async def _main(self) -> None: async with create_task_group() as tg: + self._task_group = tg for task, args in self._tasks_and_args: tg.start_soon(task, *args) await to_thread.run_sync(self.__stop.wait) diff --git a/pyproject.toml b/pyproject.toml index 675d9d87..5201e555 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -30,10 +30,11 @@ dependencies = [ "nest_asyncio>=1.4", "matplotlib-inline>=0.1", 'appnope>=0.1.2;platform_system=="Darwin"', - "pyzmq>=25.0", + "pyzmq>=26.0", "psutil>=5.7", "packaging>=22", "anyio>=4.2.0", + "zmq-anyio >=0.2.3", ] [project.urls] diff --git a/tests/conftest.py b/tests/conftest.py index 2c266555..fc798e74 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,14 +1,12 @@ -import asyncio import logging -import os from math import inf from typing import Any, Callable, no_type_check from unittest.mock import MagicMock import pytest import zmq -import zmq.asyncio -from anyio import create_memory_object_stream, create_task_group +import zmq_anyio +from anyio import create_memory_object_stream, create_task_group, sleep from anyio.streams.memory import MemoryObjectReceiveStream, MemoryObjectSendStream from jupyter_client.session import Session @@ -46,11 +44,6 @@ def anyio_backend(): resource.setrlimit(resource.RLIMIT_NOFILE, (soft, hard)) -# Enforce selector event loop on Windows. -if os.name == "nt": - asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy()) # type:ignore - - class TestSession(Session): """A session that copies sent messages to an internal stream, so that they can be accessed later. @@ -77,21 +70,21 @@ def send(self, socket, *args, **kwargs): class KernelMixin: - shell_socket: zmq.asyncio.Socket - control_socket: zmq.asyncio.Socket + shell_socket: zmq_anyio.Socket + control_socket: zmq_anyio.Socket stop: Callable[[], None] log = logging.getLogger() def _initialize(self): self._is_test = True - self.context = context = zmq.asyncio.Context() - self.iopub_socket = context.socket(zmq.PUB) - self.stdin_socket = context.socket(zmq.ROUTER) + self.context = context = zmq.Context() + self.iopub_socket = zmq_anyio.Socket(context.socket(zmq.PUB)) + self.stdin_socket = zmq_anyio.Socket(context.socket(zmq.ROUTER)) self.test_sockets = [self.iopub_socket] for name in ["shell", "control"]: - socket = context.socket(zmq.ROUTER) + socket = zmq_anyio.Socket(context.socket(zmq.ROUTER)) self.test_sockets.append(socket) setattr(self, f"{name}_socket", socket) @@ -142,7 +135,7 @@ def _prep_msg(self, *args, **kwargs): async def _wait_for_msg(self): while not self._reply: - await asyncio.sleep(0.1) + await sleep(0.1) _, msg = self.session.feed_identities(self._reply) return self.session.deserialize(msg) diff --git a/tests/test_async.py b/tests/test_async.py index a40db4a0..f1d91c5e 100644 --- a/tests/test_async.py +++ b/tests/test_async.py @@ -8,6 +8,8 @@ from .test_message_spec import validate_message from .utils import TIMEOUT, execute, flush_channels, start_new_kernel +pytestmark = pytest.mark.anyio + KC = KM = None @@ -30,24 +32,22 @@ def test_async_await(): assert content["status"] == "ok", content -# FIXME: @pytest.mark.parametrize("asynclib", ["asyncio", "trio", "curio"]) @pytest.mark.skipif(os.name == "nt", reason="Cannot interrupt on Windows") -@pytest.mark.parametrize("asynclib", ["asyncio"]) -def test_async_interrupt(asynclib, request): +def test_async_interrupt(anyio_backend, request): assert KC is not None assert KM is not None try: - __import__(asynclib) + __import__(anyio_backend) except ImportError: - pytest.skip("Requires %s" % asynclib) - request.addfinalizer(lambda: execute("%autoawait asyncio", KC)) + pytest.skip("Requires %s" % anyio_backend) + request.addfinalizer(lambda: execute(f"%autoawait {anyio_backend}", KC)) flush_channels(KC) - msg_id, content = execute("%autoawait " + asynclib, KC) + msg_id, content = execute(f"%autoawait {anyio_backend}", KC) assert content["status"] == "ok", content flush_channels(KC) - msg_id = KC.execute(f"print('begin'); import {asynclib}; await {asynclib}.sleep(5)") + msg_id = KC.execute(f"print('begin'); import {anyio_backend}; await {anyio_backend}.sleep(5)") busy = KC.get_iopub_msg(timeout=TIMEOUT) validate_message(busy, "status", msg_id) assert busy["content"]["execution_state"] == "busy" diff --git a/tests/test_io.py b/tests/test_io.py index e3ff2815..09add95f 100644 --- a/tests/test_io.py +++ b/tests/test_io.py @@ -12,22 +12,24 @@ import pytest import zmq -import zmq.asyncio +import zmq_anyio from jupyter_client.session import Session from ipykernel.iostream import _PARENT, BackgroundSocket, IOPubThread, OutStream +pytestmark = pytest.mark.anyio + @pytest.fixture() def ctx(): - ctx = zmq.asyncio.Context() + ctx = zmq.Context() yield ctx ctx.destroy() @pytest.fixture() -def iopub_thread(ctx): - with ctx.socket(zmq.PUB) as pub: +async def iopub_thread(ctx): + async with zmq_anyio.Socket(ctx.socket(zmq.PUB)) as pub: thread = IOPubThread(pub) thread.start() @@ -36,7 +38,7 @@ def iopub_thread(ctx): thread.close() -def test_io_api(iopub_thread): +async def test_io_api(iopub_thread): """Test that wrapped stdout has the same API as a normal TextIO object""" session = Session() stream = OutStream(session, iopub_thread, "stdout") @@ -59,12 +61,13 @@ def test_io_api(iopub_thread): stream.write(b"") # type:ignore -def test_io_isatty(iopub_thread): +async def test_io_isatty(iopub_thread): session = Session() stream = OutStream(session, iopub_thread, "stdout", isatty=True) assert stream.isatty() +@pytest.mark.skip(reason="FIXME") async def test_io_thread(anyio_backend, iopub_thread): thread = iopub_thread thread._setup_pipe_in() @@ -150,61 +153,63 @@ async def test_event_pipe_gc(iopub_thread): # assert iopub_thread._event_pipes == {} -def subprocess_test_echo_watch(): +async def subprocess_test_echo_watch(): # handshake Pub subscription session = Session(key=b"abc") # use PUSH socket to avoid subscription issues - with zmq.asyncio.Context() as ctx, ctx.socket(zmq.PUSH) as pub: - pub.connect(os.environ["IOPUB_URL"]) - iopub_thread = IOPubThread(pub) - iopub_thread.start() - stdout_fd = sys.stdout.fileno() - sys.stdout.flush() - stream = OutStream( - session, - iopub_thread, - "stdout", - isatty=True, - echo=sys.stdout, - watchfd="force", - ) - save_stdout = sys.stdout - with stream, mock.patch.object(sys, "stdout", stream): - # write to low-level FD - os.write(stdout_fd, b"fd\n") - # print (writes to stream) - print("print\n", end="") + with zmq.Context() as ctx: + async with zmq_anyio.Socket(ctx.socket(zmq.PUSH)) as pub: + pub.connect(os.environ["IOPUB_URL"]) + iopub_thread = IOPubThread(pub) + iopub_thread.start() + stdout_fd = sys.stdout.fileno() sys.stdout.flush() - # write to unwrapped __stdout__ (should also go to original FD) - sys.__stdout__.write("__stdout__\n") - sys.__stdout__.flush() - # write to original sys.stdout (should be the same as __stdout__) - save_stdout.write("stdout\n") - save_stdout.flush() - # is there another way to flush on the FD? - fd_file = os.fdopen(stdout_fd, "w") - fd_file.flush() - # we don't have a sync flush on _reading_ from the watched pipe - time.sleep(1) - stream.flush() - iopub_thread.stop() - iopub_thread.close() + stream = OutStream( + session, + iopub_thread, + "stdout", + isatty=True, + echo=sys.stdout, + watchfd="force", + ) + save_stdout = sys.stdout + with stream, mock.patch.object(sys, "stdout", stream): + # write to low-level FD + os.write(stdout_fd, b"fd\n") + # print (writes to stream) + print("print\n", end="") + sys.stdout.flush() + # write to unwrapped __stdout__ (should also go to original FD) + sys.__stdout__.write("__stdout__\n") + sys.__stdout__.flush() + # write to original sys.stdout (should be the same as __stdout__) + save_stdout.write("stdout\n") + save_stdout.flush() + # is there another way to flush on the FD? + fd_file = os.fdopen(stdout_fd, "w") + fd_file.flush() + # we don't have a sync flush on _reading_ from the watched pipe + time.sleep(1) + stream.flush() + iopub_thread.stop() + iopub_thread.close() @pytest.mark.anyio() -@pytest.mark.skipif(sys.platform.startswith("win"), reason="Windows") +# @pytest.mark.skipif(sys.platform.startswith("win"), reason="Windows") +@pytest.mark.skip(reason="FIXME") async def test_echo_watch(ctx): """Test echo on underlying FD while capturing the same FD Test runs in a subprocess to avoid messing with pytest output capturing. """ - s = ctx.socket(zmq.PULL) + s = zmq_anyio.Socket(ctx.socket(zmq.PULL)) port = s.bind_to_random_port("tcp://127.0.0.1") url = f"tcp://127.0.0.1:{port}" session = Session(key=b"abc") stdout_chunks = [] - with s: + async with s: env = dict(os.environ) env["IOPUB_URL"] = url env["PYTHONUNBUFFERED"] = "1" @@ -224,8 +229,8 @@ async def test_echo_watch(ctx): print(f"{p.stdout=}") print(f"{p.stderr}=", file=sys.stderr) assert p.returncode == 0 - while await s.poll(timeout=100): - msg = await s.recv_multipart() + while await s.apoll(timeout=100): + msg = await s.arecv_multipart() ident, msg = session.feed_identities(msg, copy=True) msg = session.deserialize(msg, content=True, copy=True) assert msg is not None # for type narrowing From 615ec12d85cf14932d6c5bae90659e9b9cfba22f Mon Sep 17 00:00:00 2001 From: David Brochart Date: Fri, 15 Nov 2024 09:57:18 +0100 Subject: [PATCH 02/26] Replace thread add_task with start_soon --- ipykernel/kernelbase.py | 17 +++++++++-------- ipykernel/shellchannel.py | 2 +- ipykernel/subshell.py | 4 ++-- ipykernel/subshell_manager.py | 14 ++++++-------- ipykernel/thread.py | 35 +++++++++++++++++------------------ 5 files changed, 35 insertions(+), 37 deletions(-) diff --git a/ipykernel/kernelbase.py b/ipykernel/kernelbase.py index 3c7324d2..fcceef26 100644 --- a/ipykernel/kernelbase.py +++ b/ipykernel/kernelbase.py @@ -16,6 +16,7 @@ import uuid import warnings from datetime import datetime +from functools import partial from signal import SIGINT, SIGTERM, Signals from .thread import CONTROL_THREAD_NAME @@ -536,7 +537,7 @@ async def start(self, *, task_status: TaskStatus = TASK_STATUS_IGNORED) -> None: self.control_stop = threading.Event() if not self._is_test and self.control_socket is not None: if self.control_thread: - self.control_thread.add_task(self.control_main) + self.control_thread.start_soon(self.control_main) self.control_thread.start() else: tg.start_soon(self.control_main) @@ -551,11 +552,11 @@ async def start(self, *, task_status: TaskStatus = TASK_STATUS_IGNORED) -> None: # Assign tasks to and start shell channel thread. manager = self.shell_channel_thread.manager - self.shell_channel_thread.add_task(self.shell_channel_thread_main) - self.shell_channel_thread.add_task( - manager.listen_from_control, self.shell_main, self.shell_channel_thread + self.shell_channel_thread.start_soon(self.shell_channel_thread_main) + self.shell_channel_thread.start_soon( + partial(manager.listen_from_control, self.shell_main, self.shell_channel_thread) ) - self.shell_channel_thread.add_task(manager.listen_from_subshells) + self.shell_channel_thread.start_soon(manager.listen_from_subshells) self.shell_channel_thread.start() else: if not self._is_test and self.shell_socket is not None: @@ -1085,7 +1086,7 @@ async def create_subshell_request(self, socket, ident, parent) -> None: # This should only be called in the control thread if it exists. # Request is passed to shell channel thread to process. other_socket = await self.shell_channel_thread.manager.get_control_other_socket( - self.control_thread.get_task_group() + self.control_thread ) await other_socket.asend_json({"type": "create"}) reply = await other_socket.arecv_json() @@ -1109,7 +1110,7 @@ async def delete_subshell_request(self, socket, ident, parent) -> None: # This should only be called in the control thread if it exists. # Request is passed to shell channel thread to process. other_socket = await self.shell_channel_thread.manager.get_control_other_socket( - self.control_thread.get_task_group() + self.control_thread ) await other_socket.asend_json({"type": "delete", "subshell_id": subshell_id}) reply = await other_socket.arecv_json() @@ -1126,7 +1127,7 @@ async def list_subshell_request(self, socket, ident, parent) -> None: # This should only be called in the control thread if it exists. # Request is passed to shell channel thread to process. other_socket = await self.shell_channel_thread.manager.get_control_other_socket( - self.control_thread.get_task_group() + self.control_thread ) await other_socket.asend_json({"type": "list"}) reply = await other_socket.arecv_json() diff --git a/ipykernel/shellchannel.py b/ipykernel/shellchannel.py index 789a8875..819a0aec 100644 --- a/ipykernel/shellchannel.py +++ b/ipykernel/shellchannel.py @@ -28,7 +28,7 @@ def __init__( def manager(self) -> SubshellManager: # Lazy initialisation. if self._manager is None: - self._manager = SubshellManager(self._context, self._shell_socket, self.get_task_group) + self._manager = SubshellManager(self._context, self._shell_socket) return self._manager def run(self) -> None: diff --git a/ipykernel/subshell.py b/ipykernel/subshell.py index e84f5498..180e9ecb 100644 --- a/ipykernel/subshell.py +++ b/ipykernel/subshell.py @@ -25,13 +25,13 @@ async def create_pair_socket( ) -> None: """Create inproc PAIR socket, for communication with shell channel thread. - Should be called from this thread, so usually via add_task before the + Should be called from this thread, so usually via start_soon before the thread is started. """ assert current_thread() == self self._pair_socket = zmq_anyio.Socket(context, zmq.PAIR) self._pair_socket.connect(address) - self.add_task(self._pair_socket.start) + self.start_soon(self._pair_socket.start) def run(self) -> None: try: diff --git a/ipykernel/subshell_manager.py b/ipykernel/subshell_manager.py index dbd3da76..505c2f40 100644 --- a/ipykernel/subshell_manager.py +++ b/ipykernel/subshell_manager.py @@ -7,8 +7,8 @@ import typing as t import uuid from dataclasses import dataclass +from functools import partial from threading import Lock, current_thread, main_thread -from typing import Callable import zmq import zmq_anyio @@ -44,13 +44,11 @@ def __init__( self, context: zmq.Context, # type: ignore[type-arg] shell_socket: zmq_anyio.Socket, - get_task_group: Callable[[], TaskGroup], ): assert current_thread() == main_thread() self._context: zmq.Context = context # type: ignore[type-arg] self._shell_socket = shell_socket - self._get_task_group = get_task_group self._cache: dict[str, Subshell] = {} self._lock_cache = Lock() self._lock_shell_socket = Lock() @@ -91,9 +89,9 @@ def close(self) -> None: break self._stop_subshell(subshell) - async def get_control_other_socket(self, task_group: TaskGroup) -> zmq_anyio.Socket: + async def get_control_other_socket(self, thread: BaseThread) -> zmq_anyio.Socket: if not self._control_other_socket.started.is_set(): - task_group.start_soon(self._control_other_socket.start) + thread.start_soon(self._control_other_socket.start) await self._control_other_socket.started.wait() return self._control_other_socket @@ -134,7 +132,7 @@ async def listen_from_control(self, subshell_task: t.Any, thread: BaseThread) -> assert current_thread().name == SHELL_CHANNEL_THREAD_NAME if not self._control_shell_channel_socket.started.is_set(): - thread.get_task_group().start_soon(self._control_shell_channel_socket.start) + thread.start_soon(self._control_shell_channel_socket.start) await self._control_shell_channel_socket.started.wait() socket = self._control_shell_channel_socket while True: @@ -200,8 +198,8 @@ async def _create_subshell(self, subshell_task: t.Any) -> str: await self._send_stream.send(subshell_id) address = self._get_inproc_socket_address(subshell_id) - thread.add_task(thread.create_pair_socket, self._context, address) - thread.add_task(subshell_task, subshell_id) + thread.start_soon(partial(thread.create_pair_socket, self._context, address)) + thread.start_soon(partial(subshell_task, subshell_id)) thread.start() return subshell_id diff --git a/ipykernel/thread.py b/ipykernel/thread.py index f55ee7c7..df8fa412 100644 --- a/ipykernel/thread.py +++ b/ipykernel/thread.py @@ -1,9 +1,12 @@ """Base class for threads.""" -import typing as t -from threading import Event, Thread +from __future__ import annotations + +from collections.abc import Awaitable +from queue import Queue +from threading import Thread +from typing import Callable from anyio import create_task_group, run, to_thread -from anyio.abc import TaskGroup CONTROL_THREAD_NAME = "Control" SHELL_CHANNEL_THREAD_NAME = "Shell channel" @@ -17,26 +20,22 @@ def __init__(self, **kwargs): super().__init__(**kwargs) self.pydev_do_not_trace = True self.is_pydev_daemon_thread = True - self.__stop = Event() - self._tasks_and_args: list[tuple[t.Any, t.Any]] = [] - - def get_task_group(self) -> TaskGroup: - return self._task_group + self._tasks: Queue[Callable[[], Awaitable[None]] | None] = Queue() - def add_task(self, task: t.Any, *args: t.Any) -> None: - # May only add tasks before the thread is started. - self._tasks_and_args.append((task, args)) + def start_soon(self, task: Callable[[], Awaitable[None]] | None) -> None: + self._tasks.put(task) - def run(self) -> t.Any: + def run(self) -> None: """Run the thread.""" - return run(self._main) + run(self._main) async def _main(self) -> None: async with create_task_group() as tg: - self._task_group = tg - for task, args in self._tasks_and_args: - tg.start_soon(task, *args) - await to_thread.run_sync(self.__stop.wait) + while True: + task = await to_thread.run_sync(self._tasks.get) + if task is None: + break + tg.start_soon(task) tg.cancel_scope.cancel() def stop(self) -> None: @@ -44,4 +43,4 @@ def stop(self) -> None: This method is threadsafe. """ - self.__stop.set() + self._tasks.put(None) From 1834b58841d3421b4e83322e821559aba18eec75 Mon Sep 17 00:00:00 2001 From: David Brochart Date: Fri, 15 Nov 2024 10:50:07 +0100 Subject: [PATCH 03/26] Replace _IOPubThread with BaseThread --- ipykernel/inprocess/socket.py | 6 ++++- ipykernel/iostream.py | 46 +++++++---------------------------- ipykernel/thread.py | 6 ++++- 3 files changed, 19 insertions(+), 39 deletions(-) diff --git a/ipykernel/inprocess/socket.py b/ipykernel/inprocess/socket.py index 5a2e0008..d14d0850 100644 --- a/ipykernel/inprocess/socket.py +++ b/ipykernel/inprocess/socket.py @@ -65,4 +65,8 @@ async def poll(self, timeout=0): return statistics.current_buffer_used != 0 def close(self): - pass + if self.is_shell: + self.in_send_stream.close() + self.in_receive_stream.close() + self.out_send_stream.close() + self.out_receive_stream.close() diff --git a/ipykernel/iostream.py b/ipykernel/iostream.py index 19334212..02a0e22a 100644 --- a/ipykernel/iostream.py +++ b/ipykernel/iostream.py @@ -16,14 +16,16 @@ from binascii import b2a_hex from collections import defaultdict, deque from io import StringIO, TextIOBase -from threading import Event, Thread, local +from threading import local from typing import Any, Callable import zmq import zmq_anyio -from anyio import create_task_group, run, sleep, to_thread +from anyio import sleep from jupyter_client.session import extract_header +from .thread import BaseThread + # ----------------------------------------------------------------------------- # Globals # ----------------------------------------------------------------------------- @@ -38,38 +40,6 @@ # ----------------------------------------------------------------------------- -class _IOPubThread(Thread): - """A thread for a IOPub.""" - - def __init__(self, tasks, **kwargs): - """Initialize the thread.""" - super().__init__(name="IOPub", **kwargs) - self._tasks = tasks - self.pydev_do_not_trace = True - self.is_pydev_daemon_thread = True - self.daemon = True - self.__stop = Event() - - def run(self): - """Run the thread.""" - self.name = "IOPub" - run(self._main) - - async def _main(self): - async with create_task_group() as self._task_group: - for task in self._tasks: - self._task_group.start_soon(task) - await to_thread.run_sync(self.__stop.wait) - self._task_group.cancel_scope.cancel() - - def stop(self): - """Stop the thread. - - This method is threadsafe. - """ - self.__stop.set() - - class IOPubThread: """An object for sending IOPub messages in a background thread @@ -109,7 +79,9 @@ def __init__(self, socket: zmq_anyio.Socket, pipe=False): tasks = [self._handle_event, self._run_event_pipe_gc, self.socket.start] if pipe: tasks.append(self._handle_pipe_msgs) - self.thread = _IOPubThread(tasks) + self.thread = BaseThread(name="IOPub", daemon=True) + for task in tasks: + self.thread.start_soon(task) def _setup_event_pipe(self): """Create the PULL socket listening for events that should fire in this thread.""" @@ -179,7 +151,7 @@ async def _handle_event(self): event_f = self._events.popleft() event_f() except Exception: - if self.thread.__stop.is_set(): + if self.thread.stopped.is_set(): return raise @@ -211,7 +183,7 @@ async def _handle_pipe_msgs(self): while True: await self._handle_pipe_msg() except Exception: - if self.thread.__stop.is_set(): + if self.thread.stopped.is_set(): return raise diff --git a/ipykernel/thread.py b/ipykernel/thread.py index df8fa412..4c9edf86 100644 --- a/ipykernel/thread.py +++ b/ipykernel/thread.py @@ -3,7 +3,7 @@ from collections.abc import Awaitable from queue import Queue -from threading import Thread +from threading import Event, Thread from typing import Callable from anyio import create_task_group, run, to_thread @@ -18,6 +18,8 @@ class BaseThread(Thread): def __init__(self, **kwargs): """Initialize the thread.""" super().__init__(**kwargs) + self.started = Event() + self.stopped = Event() self.pydev_do_not_trace = True self.is_pydev_daemon_thread = True self._tasks: Queue[Callable[[], Awaitable[None]] | None] = Queue() @@ -31,6 +33,7 @@ def run(self) -> None: async def _main(self) -> None: async with create_task_group() as tg: + self.started.set() while True: task = await to_thread.run_sync(self._tasks.get) if task is None: @@ -44,3 +47,4 @@ def stop(self) -> None: This method is threadsafe. """ self._tasks.put(None) + self.stopped.set() From 657ca59c64a61b0b33a01031dc0018d3d65cb334 Mon Sep 17 00:00:00 2001 From: David Brochart Date: Fri, 15 Nov 2024 11:36:11 +0100 Subject: [PATCH 04/26] Fix tests --- pyproject.toml | 3 +++ tests/test_io.py | 8 ++------ 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 5201e555..87d13e5c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -192,6 +192,9 @@ filterwarnings= [ # ignore unclosed sqlite in traits "ignore:unclosed database in .trigger_timeout' was never awaited", ] [tool.coverage.report] diff --git a/tests/test_io.py b/tests/test_io.py index 09add95f..9d14e5f2 100644 --- a/tests/test_io.py +++ b/tests/test_io.py @@ -67,7 +67,6 @@ async def test_io_isatty(iopub_thread): assert stream.isatty() -@pytest.mark.skip(reason="FIXME") async def test_io_thread(anyio_backend, iopub_thread): thread = iopub_thread thread._setup_pipe_in() @@ -80,8 +79,6 @@ async def test_io_thread(anyio_backend, iopub_thread): thread._really_send([b"hi"]) ctx1.destroy() thread.stop() - thread.close() - thread._really_send(None) async def test_background_socket(anyio_backend, iopub_thread): @@ -197,8 +194,7 @@ async def subprocess_test_echo_watch(): @pytest.mark.anyio() -# @pytest.mark.skipif(sys.platform.startswith("win"), reason="Windows") -@pytest.mark.skip(reason="FIXME") +@pytest.mark.skipif(sys.platform.startswith("win"), reason="Windows") async def test_echo_watch(ctx): """Test echo on underlying FD while capturing the same FD @@ -218,7 +214,7 @@ async def test_echo_watch(ctx): [ sys.executable, "-c", - f"import {__name__}; {__name__}.subprocess_test_echo_watch()", + f"import {__name__}, anyio; anyio.run({__name__}.subprocess_test_echo_watch)", ], env=env, capture_output=True, From 18e7317679f208cff9cecd64893e502c7868fef4 Mon Sep 17 00:00:00 2001 From: David Brochart Date: Fri, 15 Nov 2024 15:02:25 +0100 Subject: [PATCH 05/26] Allow testing trio --- ipykernel/thread.py | 5 ++++- pyproject.toml | 1 + tests/conftest.py | 10 +++++----- tests/test_async.py | 3 +-- tests/test_eventloop.py | 1 + tests/test_io.py | 23 ++++++++++++----------- 6 files changed, 24 insertions(+), 19 deletions(-) diff --git a/ipykernel/thread.py b/ipykernel/thread.py index 4c9edf86..d853a2ad 100644 --- a/ipykernel/thread.py +++ b/ipykernel/thread.py @@ -29,7 +29,10 @@ def start_soon(self, task: Callable[[], Awaitable[None]] | None) -> None: def run(self) -> None: """Run the thread.""" - run(self._main) + try: + run(self._main) + except Exception: + pass async def _main(self) -> None: async with create_task_group() as tg: diff --git a/pyproject.toml b/pyproject.toml index 87d13e5c..7fb55eec 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -195,6 +195,7 @@ filterwarnings= [ # ignore timeout cancel coroutine not awaited in zmq-anyio "ignore: coroutine 'Poller._apoll..trigger_timeout' was never awaited", + "ignore: Unclosed socket" ] [tool.coverage.report] diff --git a/tests/conftest.py b/tests/conftest.py index fc798e74..db992b74 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,5 +1,6 @@ import logging from math import inf +from threading import Event from typing import Any, Callable, no_type_check from unittest.mock import MagicMock @@ -21,11 +22,6 @@ resource = None # type:ignore -@pytest.fixture() -def anyio_backend(): - return "asyncio" - - pytestmark = pytest.mark.anyio @@ -159,6 +155,8 @@ class MockKernel(KernelMixin, Kernel): # type:ignore def __init__(self, *args, **kwargs): self._initialize() self.shell = MagicMock() + self.shell_stop = Event() + self.control_stop = Event() super().__init__(*args, **kwargs) def do_execute( @@ -180,6 +178,8 @@ def do_execute( class MockIPyKernel(KernelMixin, IPythonKernel): # type:ignore def __init__(self, *args, **kwargs): self._initialize() + self.shell_stop = Event() + self.control_stop = Event() super().__init__(*args, **kwargs) diff --git a/tests/test_async.py b/tests/test_async.py index f1d91c5e..c2dd980b 100644 --- a/tests/test_async.py +++ b/tests/test_async.py @@ -8,8 +8,6 @@ from .test_message_spec import validate_message from .utils import TIMEOUT, execute, flush_channels, start_new_kernel -pytestmark = pytest.mark.anyio - KC = KM = None @@ -33,6 +31,7 @@ def test_async_await(): @pytest.mark.skipif(os.name == "nt", reason="Cannot interrupt on Windows") +@pytest.mark.parametrize("anyio_backend", ["asyncio"]) # FIXME: %autoawait trio def test_async_interrupt(anyio_backend, request): assert KC is not None assert KM is not None diff --git a/tests/test_eventloop.py b/tests/test_eventloop.py index 62a7f8ba..fcaa2bde 100644 --- a/tests/test_eventloop.py +++ b/tests/test_eventloop.py @@ -85,6 +85,7 @@ def do_thing(): @windows_skip +@pytest.mark.parametrize("anyio_backend", ["asyncio"]) def test_asyncio_loop(kernel): def do_thing(): loop.call_later(0.01, loop.stop) diff --git a/tests/test_io.py b/tests/test_io.py index 9d14e5f2..7b86a5d7 100644 --- a/tests/test_io.py +++ b/tests/test_io.py @@ -29,13 +29,16 @@ def ctx(): @pytest.fixture() async def iopub_thread(ctx): - async with zmq_anyio.Socket(ctx.socket(zmq.PUB)) as pub: - thread = IOPubThread(pub) - thread.start() + try: + async with zmq_anyio.Socket(ctx.socket(zmq.PUB)) as pub: + thread = IOPubThread(pub) + thread.start() - yield thread - thread.stop() - thread.close() + yield thread + thread.stop() + thread.close() + except Exception: + pass async def test_io_api(iopub_thread): @@ -67,7 +70,7 @@ async def test_io_isatty(iopub_thread): assert stream.isatty() -async def test_io_thread(anyio_backend, iopub_thread): +async def test_io_thread(iopub_thread): thread = iopub_thread thread._setup_pipe_in() msg = [thread._pipe_uuid, b"a"] @@ -81,7 +84,7 @@ async def test_io_thread(anyio_backend, iopub_thread): thread.stop() -async def test_background_socket(anyio_backend, iopub_thread): +async def test_background_socket(iopub_thread): sock = BackgroundSocket(iopub_thread) assert sock.__class__ == BackgroundSocket with warnings.catch_warnings(): @@ -92,7 +95,7 @@ async def test_background_socket(anyio_backend, iopub_thread): sock.send(b"hi") -async def test_outstream(anyio_backend, iopub_thread): +async def test_outstream(iopub_thread): session = Session() pub = iopub_thread.socket @@ -118,7 +121,6 @@ async def test_outstream(anyio_backend, iopub_thread): assert stream.writable() -@pytest.mark.anyio() async def test_event_pipe_gc(iopub_thread): session = Session(key=b"abc") stream = OutStream( @@ -193,7 +195,6 @@ async def subprocess_test_echo_watch(): iopub_thread.close() -@pytest.mark.anyio() @pytest.mark.skipif(sys.platform.startswith("win"), reason="Windows") async def test_echo_watch(ctx): """Test echo on underlying FD while capturing the same FD From cd9eed21590a91ba714a1ef35f8ec02f6061d4d9 Mon Sep 17 00:00:00 2001 From: David Brochart Date: Wed, 20 Nov 2024 11:12:13 +0100 Subject: [PATCH 06/26] Remove pytest-asyncio from test dependencies --- pyproject.toml | 1 - 1 file changed, 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 7fb55eec..6396c739 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -63,7 +63,6 @@ test = [ "pre-commit", "pytest-timeout", "trio", - "pytest-asyncio>=0.23.5", ] cov = [ "coverage[toml]", From 3f8e7dc837984a6a97b17fdcc975d0cb1f219bf3 Mon Sep 17 00:00:00 2001 From: David Brochart Date: Wed, 20 Nov 2024 11:30:46 +0100 Subject: [PATCH 07/26] Use selector thread from anyio --- .github/workflows/ci.yml | 6 +++++- pyproject.toml | 3 +++ 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 70b3bf40..eccb4527 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -55,7 +55,11 @@ jobs: timeout-minutes: 15 if: ${{ startsWith(matrix.os, 'windows') }} run: | - hatch run cov:nowarn || hatch run test:nowarn --lf + hatch run test:pip install git+https://github.com/davidbrochart/zmq-anyio.git@anyio-selector-thread#egg=zmq_anyio --ignore-installed + hatch run test:pip install git+https://github.com/davidbrochart/anyio.git@selector-thread#egg=anyio --ignore-installed + hatch run test:pip list + hatch run test:python --version + hatch run test:pytest -v - name: Check Launcher run: | diff --git a/pyproject.toml b/pyproject.toml index 6396c739..c4313fd3 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -319,3 +319,6 @@ ignore = ["W002"] [tool.repo-review] ignore = ["PY007", "PP308", "GH102", "MY101"] + +[tool.hatch.metadata] +allow-direct-references = true From 1422a27114d981e6775daa2172be4a901834ce58 Mon Sep 17 00:00:00 2001 From: David Brochart Date: Wed, 20 Nov 2024 14:38:45 +0100 Subject: [PATCH 08/26] Remove base setup --- .github/workflows/ci.yml | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index eccb4527..31cd72d3 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -36,8 +36,14 @@ jobs: - name: Checkout uses: actions/checkout@v4 - - name: Base Setup - uses: jupyterlab/maintainer-tools/.github/actions/base-setup@v1 + - uses: actions/setup-python@v5 + with: + python-version: ${{ matrix.python-version }} + + - name: Install hatch + run: | + python --version + python -m pip install hatch - name: Run the tests timeout-minutes: 15 From ca62b0ff71c9d7398e093a2ed76fc03d2e757d27 Mon Sep 17 00:00:00 2001 From: David Brochart Date: Wed, 20 Nov 2024 15:42:19 +0100 Subject: [PATCH 09/26] Test more Python versions on Windows --- .github/workflows/ci.yml | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 31cd72d3..4e3290f1 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -32,6 +32,12 @@ jobs: python-version: "3.11" - os: ubuntu-latest python-version: "3.12" + - os: windows-latest + python-version: "3.10" + - os: windows-latest + python-version: "3.11" + - os: windows-latest + python-version: "3.12" steps: - name: Checkout uses: actions/checkout@v4 From 3046de3ef5f095d2131c252572830f97bf59a600 Mon Sep 17 00:00:00 2001 From: David Brochart Date: Sun, 1 Dec 2024 15:55:53 +0100 Subject: [PATCH 10/26] Use anyio's alternate selector thread --- .github/workflows/ci.yml | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 4e3290f1..4deab6cc 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -68,7 +68,8 @@ jobs: if: ${{ startsWith(matrix.os, 'windows') }} run: | hatch run test:pip install git+https://github.com/davidbrochart/zmq-anyio.git@anyio-selector-thread#egg=zmq_anyio --ignore-installed - hatch run test:pip install git+https://github.com/davidbrochart/anyio.git@selector-thread#egg=anyio --ignore-installed + # hatch run test:pip install git+https://github.com/davidbrochart/anyio.git@selector-thread#egg=anyio --ignore-installed + hatch run test:pip install git+https://github.com/agronholm/anyio.git@selector-thread-alternate#egg=anyio --ignore-installed hatch run test:pip list hatch run test:python --version hatch run test:pytest -v From 25743c0069585f54262f2aff6aba2cc60f77c685 Mon Sep 17 00:00:00 2001 From: David Brochart Date: Sun, 1 Dec 2024 19:56:40 +0100 Subject: [PATCH 11/26] - --- ipykernel/kernelbase.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ipykernel/kernelbase.py b/ipykernel/kernelbase.py index fcceef26..b837c2a5 100644 --- a/ipykernel/kernelbase.py +++ b/ipykernel/kernelbase.py @@ -382,7 +382,7 @@ async def shell_channel_thread_main(self): subshell_id ) assert socket is not None - socket.send_multipart(msg, copy=False) + await socket.asend_multipart(msg, copy=False) except Exception: self.log.error("Invalid message", exc_info=True) # noqa: G201 except BaseException: From 3a5b40a6e2f86d8bb65105c92ca6601dff06909c Mon Sep 17 00:00:00 2001 From: David Brochart Date: Sun, 1 Dec 2024 20:11:39 +0100 Subject: [PATCH 12/26] - --- ipykernel/kernelbase.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/ipykernel/kernelbase.py b/ipykernel/kernelbase.py index b837c2a5..ed78fc21 100644 --- a/ipykernel/kernelbase.py +++ b/ipykernel/kernelbase.py @@ -366,7 +366,7 @@ async def shell_channel_thread_main(self): assert self.shell_channel_thread is not None assert threading.current_thread() == self.shell_channel_thread - async with self.shell_socket: + async with self.shell_socket, create_task_group() as tg: try: while True: msg = await self.shell_socket.arecv_multipart(copy=False) @@ -382,6 +382,8 @@ async def shell_channel_thread_main(self): subshell_id ) assert socket is not None + if not socket.started.is_set(): + await tg.start(socket.start) await socket.asend_multipart(msg, copy=False) except Exception: self.log.error("Invalid message", exc_info=True) # noqa: G201 From 4858e8296a7cdea302e1cf714e509cb4b47707b3 Mon Sep 17 00:00:00 2001 From: David Brochart Date: Thu, 5 Dec 2024 17:29:04 +0100 Subject: [PATCH 13/26] - --- .github/workflows/ci.yml | 3 --- pyproject.toml | 2 +- 2 files changed, 1 insertion(+), 4 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 4deab6cc..0cbf7841 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -67,9 +67,6 @@ jobs: timeout-minutes: 15 if: ${{ startsWith(matrix.os, 'windows') }} run: | - hatch run test:pip install git+https://github.com/davidbrochart/zmq-anyio.git@anyio-selector-thread#egg=zmq_anyio --ignore-installed - # hatch run test:pip install git+https://github.com/davidbrochart/anyio.git@selector-thread#egg=anyio --ignore-installed - hatch run test:pip install git+https://github.com/agronholm/anyio.git@selector-thread-alternate#egg=anyio --ignore-installed hatch run test:pip list hatch run test:python --version hatch run test:pytest -v diff --git a/pyproject.toml b/pyproject.toml index c4313fd3..fb4eff50 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -34,7 +34,7 @@ dependencies = [ "psutil>=5.7", "packaging>=22", "anyio>=4.2.0", - "zmq-anyio >=0.2.3", + "zmq-anyio >=0.2.4", ] [project.urls] From e22861ac8d1847c26ee6a0441f1ab73f22fdd232 Mon Sep 17 00:00:00 2001 From: David Brochart Date: Tue, 17 Dec 2024 09:54:16 +0100 Subject: [PATCH 14/26] - --- ipykernel/kernelbase.py | 2 +- ipykernel/subshell_manager.py | 14 ++++++-------- 2 files changed, 7 insertions(+), 9 deletions(-) diff --git a/ipykernel/kernelbase.py b/ipykernel/kernelbase.py index ed78fc21..3ac67130 100644 --- a/ipykernel/kernelbase.py +++ b/ipykernel/kernelbase.py @@ -556,7 +556,7 @@ async def start(self, *, task_status: TaskStatus = TASK_STATUS_IGNORED) -> None: manager = self.shell_channel_thread.manager self.shell_channel_thread.start_soon(self.shell_channel_thread_main) self.shell_channel_thread.start_soon( - partial(manager.listen_from_control, self.shell_main, self.shell_channel_thread) + partial(manager.listen_from_control, self.shell_main) ) self.shell_channel_thread.start_soon(manager.listen_from_subshells) self.shell_channel_thread.start() diff --git a/ipykernel/subshell_manager.py b/ipykernel/subshell_manager.py index 505c2f40..2636d157 100644 --- a/ipykernel/subshell_manager.py +++ b/ipykernel/subshell_manager.py @@ -125,20 +125,18 @@ def list_subshell(self) -> list[str]: with self._lock_cache: return list(self._cache) - async def listen_from_control(self, subshell_task: t.Any, thread: BaseThread) -> None: + async def listen_from_control(self, subshell_task: t.Any) -> None: """Listen for messages on the control inproc socket, handle those messages and return replies on the same socket. Runs in the shell channel thread. """ assert current_thread().name == SHELL_CHANNEL_THREAD_NAME - if not self._control_shell_channel_socket.started.is_set(): - thread.start_soon(self._control_shell_channel_socket.start) - await self._control_shell_channel_socket.started.wait() socket = self._control_shell_channel_socket - while True: - request = await socket.arecv_json() - reply = await self._process_control_request(request, subshell_task) - await socket.asend_json(reply) + async with socket: + while True: + request = await socket.arecv_json() + reply = await self._process_control_request(request, subshell_task) + await socket.asend_json(reply) async def listen_from_subshells(self) -> None: """Listen for reply messages on inproc sockets of all subshells and resend From 35a45d13861ecf670f80c1c8096d5bc4c0868721 Mon Sep 17 00:00:00 2001 From: David Brochart Date: Tue, 17 Dec 2024 11:18:30 +0100 Subject: [PATCH 15/26] - --- ipykernel/subshell_manager.py | 2 +- ipykernel/thread.py | 34 +++++++++++++++++++++++++++++----- 2 files changed, 30 insertions(+), 6 deletions(-) diff --git a/ipykernel/subshell_manager.py b/ipykernel/subshell_manager.py index 2636d157..b9dea456 100644 --- a/ipykernel/subshell_manager.py +++ b/ipykernel/subshell_manager.py @@ -91,7 +91,7 @@ def close(self) -> None: async def get_control_other_socket(self, thread: BaseThread) -> zmq_anyio.Socket: if not self._control_other_socket.started.is_set(): - thread.start_soon(self._control_other_socket.start) + thread.task_group.start_soon(self._control_other_socket.start) await self._control_other_socket.started.wait() return self._control_other_socket diff --git a/ipykernel/thread.py b/ipykernel/thread.py index d853a2ad..dc68bb3b 100644 --- a/ipykernel/thread.py +++ b/ipykernel/thread.py @@ -4,9 +4,10 @@ from collections.abc import Awaitable from queue import Queue from threading import Event, Thread -from typing import Callable +from typing import Any, Callable from anyio import create_task_group, run, to_thread +from anyio.abc import TaskGroup CONTROL_THREAD_NAME = "Control" SHELL_CHANNEL_THREAD_NAME = "Shell channel" @@ -22,10 +23,23 @@ def __init__(self, **kwargs): self.stopped = Event() self.pydev_do_not_trace = True self.is_pydev_daemon_thread = True - self._tasks: Queue[Callable[[], Awaitable[None]] | None] = Queue() + self._tasks: Queue[tuple[str, Callable[[], Awaitable[Any]]] | None] = Queue() + self._result: Queue[Any] = Queue() - def start_soon(self, task: Callable[[], Awaitable[None]] | None) -> None: - self._tasks.put(task) + @property + def task_group(self) -> TaskGroup: + return self._task_group + + def start_soon(self, coro: Callable[[], Awaitable[Any]]) -> None: + self._tasks.put(("start_soon", coro)) + + def run_async(self, coro: Callable[[], Awaitable[Any]]) -> Any: + self._tasks.put(("run_async", coro)) + return self._result.get() + + def run_sync(self, func: Callable[..., Any]) -> Any: + self._tasks.put(("run_sync", func)) + return self._result.get() def run(self) -> None: """Run the thread.""" @@ -36,12 +50,22 @@ def run(self) -> None: async def _main(self) -> None: async with create_task_group() as tg: + self._task_group = tg self.started.set() while True: task = await to_thread.run_sync(self._tasks.get) if task is None: break - tg.start_soon(task) + func, arg = task + if func == "start_soon": + tg.start_soon(arg) + elif func == "run_async": + res = await arg + self._result.put(res) + else: # func == "run_sync" + res = arg() + self._result.put(res) + tg.cancel_scope.cancel() def stop(self) -> None: From 7f11923c1a6e701e10d3068fa02462584c00902d Mon Sep 17 00:00:00 2001 From: David Brochart Date: Tue, 17 Dec 2024 13:42:41 +0100 Subject: [PATCH 16/26] - --- ipykernel/subshell_manager.py | 40 ++++++++++++++++++++++++++++------- 1 file changed, 32 insertions(+), 8 deletions(-) diff --git a/ipykernel/subshell_manager.py b/ipykernel/subshell_manager.py index b9dea456..2120abe1 100644 --- a/ipykernel/subshell_manager.py +++ b/ipykernel/subshell_manager.py @@ -56,15 +56,39 @@ def __init__( # Inproc pair sockets for control channel and main shell (parent subshell). # Each inproc pair has a "shell_channel" socket used in the shell channel # thread, and an "other" socket used in the other thread. - self._control_shell_channel_socket = self._create_inproc_pair_socket("control", True) - self._control_other_socket = self._create_inproc_pair_socket("control", False) - self._parent_shell_channel_socket = self._create_inproc_pair_socket(None, True) - self._parent_other_socket = self._create_inproc_pair_socket(None, False) + self.__control_shell_channel_socket: zmq_anyio.Socket | None = None + self.__control_other_socket: zmq_anyio.Socket | None = None + self.__parent_shell_channel_socket: zmq_anyio.Socket | None = None + self.__parent_other_socket: zmq_anyio.Socket | None = None # anyio memory object stream for async queue-like communication between tasks. # Used by _create_subshell to tell listen_from_subshells to spawn a new task. self._send_stream, self._receive_stream = create_memory_object_stream[str]() + @property + def _control_shell_channel_socket(self) -> zmq_anyio.Socket: + if self.__control_shell_channel_socket is None: + self.__control_shell_channel_socket = self._create_inproc_pair_socket("control", True) + return self.__control_shell_channel_socket + + @property + def _control_other_socket(self) -> zmq_anyio.Socket: + if self.__control_other_socket is None: + self.__control_other_socket = self._create_inproc_pair_socket("control", False) + return self.__control_other_socket + + @property + def _parent_shell_channel_socket(self) -> zmq_anyio.Socket: + if self.__parent_shell_channel_socket is None: + self.__parent_shell_channel_socket = self._create_inproc_pair_socket(None, True) + return self.__parent_shell_channel_socket + + @property + def _parent_other_socket(self) -> zmq_anyio.Socket: + if self.__parent_other_socket is None: + self.__parent_other_socket = self._create_inproc_pair_socket(None, False) + return self.__parent_other_socket + def close(self) -> None: """Stop all subshells and close all resources.""" assert current_thread().name == SHELL_CHANNEL_THREAD_NAME @@ -73,10 +97,10 @@ def close(self) -> None: self._receive_stream.close() for socket in ( - self._control_shell_channel_socket, - self._control_other_socket, - self._parent_shell_channel_socket, - self._parent_other_socket, + self.__control_shell_channel_socket, + self.__control_other_socket, + self.__parent_shell_channel_socket, + self.__parent_other_socket, ): if socket is not None: socket.close() From 32253f846e9aba2af62d7dd2f48d0575096ba941 Mon Sep 17 00:00:00 2001 From: David Brochart Date: Tue, 17 Dec 2024 16:16:14 +0100 Subject: [PATCH 17/26] - --- tests/test_kernel.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/test_kernel.py b/tests/test_kernel.py index 8efc3dcc..428d8750 100644 --- a/tests/test_kernel.py +++ b/tests/test_kernel.py @@ -62,7 +62,7 @@ def test_simple_print(): def test_print_to_correct_cell_from_thread(): """should print to the cell that spawned the thread, not a subsequently run cell""" iterations = 5 - interval = 0.25 + interval = 1 code = f"""\ from threading import Thread from time import sleep @@ -94,7 +94,7 @@ def thread_target(): def test_print_to_correct_cell_from_child_thread(): """should print to the cell that spawned the thread, not a subsequently run cell""" iterations = 5 - interval = 0.25 + interval = 1 code = f"""\ from threading import Thread from time import sleep @@ -130,7 +130,7 @@ def parent_target(): def test_print_to_correct_cell_from_asyncio(): """should print to the cell that scheduled the task, not a subsequently run cell""" iterations = 5 - interval = 0.25 + interval = 1 code = f"""\ import asyncio From c4bf3b7f75ad6cdebe8c8ed5c0a544895638b612 Mon Sep 17 00:00:00 2001 From: David Brochart Date: Tue, 17 Dec 2024 16:40:37 +0100 Subject: [PATCH 18/26] - --- tests/test_kernel.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/tests/test_kernel.py b/tests/test_kernel.py index 428d8750..aea2408d 100644 --- a/tests/test_kernel.py +++ b/tests/test_kernel.py @@ -83,6 +83,8 @@ def thread_target(): msg = kc.get_iopub_msg(timeout=interval * 2) if msg["msg_type"] != "stream": continue + print(f"{thread_msg_id=}") + print(f"{msg=}") content = msg["content"] assert content["name"] == "stdout" assert content["text"] == str(received) @@ -119,6 +121,8 @@ def parent_target(): msg = kc.get_iopub_msg(timeout=interval * 2) if msg["msg_type"] != "stream": continue + print(f"{thread_msg_id=}") + print(f"{msg=}") content = msg["content"] assert content["name"] == "stdout" assert content["text"] == str(received) @@ -151,6 +155,8 @@ async def async_task(): msg = kc.get_iopub_msg(timeout=interval * 2) if msg["msg_type"] != "stream": continue + print(f"{thread_msg_id=}") + print(f"{msg=}") content = msg["content"] assert content["name"] == "stdout" assert content["text"] == str(received) From 82ef8c43d3a191167556e787e276b8417f6c46ad Mon Sep 17 00:00:00 2001 From: David Brochart Date: Tue, 17 Dec 2024 17:49:27 +0100 Subject: [PATCH 19/26] Enable tracemalloc --- .github/workflows/ci.yml | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 0cbf7841..172e8a6f 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -55,13 +55,13 @@ jobs: timeout-minutes: 15 if: ${{ !startsWith( matrix.python-version, 'pypy' ) && !startsWith(matrix.os, 'windows') }} run: | - hatch run cov:test --cov-fail-under 50 || hatch run test:test --lf + PYTHONTRACEMALLOC=20 hatch run cov:test --cov-fail-under 50 || PYTHONTRACEMALLOC=20 hatch run test:test --lf - name: Run the tests on pypy timeout-minutes: 15 if: ${{ startsWith( matrix.python-version, 'pypy' ) }} run: | - hatch run test:nowarn || hatch run test:nowarn --lf + PYTHONTRACEMALLOC=20 hatch run test:nowarn || PYTHONTRACEMALLOC=20 hatch run test:nowarn --lf - name: Run the tests on Windows timeout-minutes: 15 @@ -69,7 +69,7 @@ jobs: run: | hatch run test:pip list hatch run test:python --version - hatch run test:pytest -v + PYTHONTRACEMALLOC=20 hatch run test:pytest -v - name: Check Launcher run: | @@ -152,7 +152,7 @@ jobs: - name: Run the tests timeout-minutes: 15 - run: pytest -W default -vv || pytest --vv -W default --lf + run: PYTHONTRACEMALLOC=20 pytest -W default -vv || PYTHONTRACEMALLOC=20 pytest --vv -W default --lf test_miniumum_versions: name: Test Minimum Versions From 057f62cfc4e159e6e864944c1b6f1a15c8a3903b Mon Sep 17 00:00:00 2001 From: M Bussonnier Date: Tue, 17 Dec 2024 20:47:13 +0100 Subject: [PATCH 20/26] Update .github/workflows/ci.yml --- .github/workflows/ci.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 172e8a6f..5ecd4610 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -69,6 +69,7 @@ jobs: run: | hatch run test:pip list hatch run test:python --version + hatch run test:python -m pip install pip install git+https://github.com/ipython/ipython@92dd9e47fe8862ee38770744c165b680cb5241b1 PYTHONTRACEMALLOC=20 hatch run test:pytest -v - name: Check Launcher From 03a878d594af17885e1bf561647eb66fd8e2e502 Mon Sep 17 00:00:00 2001 From: M Bussonnier Date: Tue, 17 Dec 2024 20:55:51 +0100 Subject: [PATCH 21/26] Update .github/workflows/ci.yml --- .github/workflows/ci.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 5ecd4610..1e7636d3 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -69,7 +69,7 @@ jobs: run: | hatch run test:pip list hatch run test:python --version - hatch run test:python -m pip install pip install git+https://github.com/ipython/ipython@92dd9e47fe8862ee38770744c165b680cb5241b1 + hatch run test:python -m pip install git+https://github.com/ipython/ipython@92dd9e47fe8862ee38770744c165b680cb5241b1 PYTHONTRACEMALLOC=20 hatch run test:pytest -v - name: Check Launcher From 25f939df831924442fc1d4d6aab8b02fdb42f168 Mon Sep 17 00:00:00 2001 From: David Brochart Date: Fri, 20 Dec 2024 10:13:43 +0100 Subject: [PATCH 22/26] - --- .github/workflows/ci.yml | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 1e7636d3..4bea26fc 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -55,13 +55,13 @@ jobs: timeout-minutes: 15 if: ${{ !startsWith( matrix.python-version, 'pypy' ) && !startsWith(matrix.os, 'windows') }} run: | - PYTHONTRACEMALLOC=20 hatch run cov:test --cov-fail-under 50 || PYTHONTRACEMALLOC=20 hatch run test:test --lf + PYTHONTRACEMALLOC=20 hatch run cov:test --cov-fail-under 50 -k test_print_to_correct_cell || PYTHONTRACEMALLOC=20 hatch run test:test --lf -k test_print_to_correct_cell - name: Run the tests on pypy timeout-minutes: 15 if: ${{ startsWith( matrix.python-version, 'pypy' ) }} run: | - PYTHONTRACEMALLOC=20 hatch run test:nowarn || PYTHONTRACEMALLOC=20 hatch run test:nowarn --lf + PYTHONTRACEMALLOC=20 hatch run test:nowarn -k test_print_to_correct_cell || PYTHONTRACEMALLOC=20 hatch run test:nowarn --lf -k test_print_to_correct_cell - name: Run the tests on Windows timeout-minutes: 15 @@ -69,8 +69,8 @@ jobs: run: | hatch run test:pip list hatch run test:python --version - hatch run test:python -m pip install git+https://github.com/ipython/ipython@92dd9e47fe8862ee38770744c165b680cb5241b1 - PYTHONTRACEMALLOC=20 hatch run test:pytest -v + #hatch run test:python -m pip install git+https://github.com/ipython/ipython@92dd9e47fe8862ee38770744c165b680cb5241b1 + PYTHONTRACEMALLOC=20 hatch run test:pytest -v -k test_print_to_correct_cell - name: Check Launcher run: | From 9de65b2e006415d7eebee04276c18588af22c264 Mon Sep 17 00:00:00 2001 From: David Brochart Date: Fri, 20 Dec 2024 10:31:12 +0100 Subject: [PATCH 23/26] - --- .github/workflows/ci.yml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 4bea26fc..2c7aefbb 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -55,13 +55,13 @@ jobs: timeout-minutes: 15 if: ${{ !startsWith( matrix.python-version, 'pypy' ) && !startsWith(matrix.os, 'windows') }} run: | - PYTHONTRACEMALLOC=20 hatch run cov:test --cov-fail-under 50 -k test_print_to_correct_cell || PYTHONTRACEMALLOC=20 hatch run test:test --lf -k test_print_to_correct_cell + PYTHONTRACEMALLOC=20 hatch run cov:test --cov-fail-under 50 -k test_print_to_correct_cell_from_child_thread || PYTHONTRACEMALLOC=20 hatch run test:test --lf -k test_print_to_correct_cell_from_child_thread - name: Run the tests on pypy timeout-minutes: 15 if: ${{ startsWith( matrix.python-version, 'pypy' ) }} run: | - PYTHONTRACEMALLOC=20 hatch run test:nowarn -k test_print_to_correct_cell || PYTHONTRACEMALLOC=20 hatch run test:nowarn --lf -k test_print_to_correct_cell + PYTHONTRACEMALLOC=20 hatch run test:nowarn -k test_print_to_correct_cell_from_child_thread || PYTHONTRACEMALLOC=20 hatch run test:nowarn --lf -k test_print_to_correct_cell_from_child_thread - name: Run the tests on Windows timeout-minutes: 15 @@ -70,7 +70,7 @@ jobs: hatch run test:pip list hatch run test:python --version #hatch run test:python -m pip install git+https://github.com/ipython/ipython@92dd9e47fe8862ee38770744c165b680cb5241b1 - PYTHONTRACEMALLOC=20 hatch run test:pytest -v -k test_print_to_correct_cell + PYTHONTRACEMALLOC=20 hatch run test:pytest -v -k test_print_to_correct_cell_from_child_thread - name: Check Launcher run: | From b9f9f7301f4a8580974cb4fe58113f257e6d773d Mon Sep 17 00:00:00 2001 From: David Brochart Date: Fri, 20 Dec 2024 13:25:27 +0100 Subject: [PATCH 24/26] - --- .github/workflows/ci.yml | 6 +++--- tests/test_kernel.py | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 2c7aefbb..6bde2ef7 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -55,13 +55,13 @@ jobs: timeout-minutes: 15 if: ${{ !startsWith( matrix.python-version, 'pypy' ) && !startsWith(matrix.os, 'windows') }} run: | - PYTHONTRACEMALLOC=20 hatch run cov:test --cov-fail-under 50 -k test_print_to_correct_cell_from_child_thread || PYTHONTRACEMALLOC=20 hatch run test:test --lf -k test_print_to_correct_cell_from_child_thread + PYTHONTRACEMALLOC=20 hatch run cov:test --cov-fail-under 50 || PYTHONTRACEMALLOC=20 hatch run test:test --lf - name: Run the tests on pypy timeout-minutes: 15 if: ${{ startsWith( matrix.python-version, 'pypy' ) }} run: | - PYTHONTRACEMALLOC=20 hatch run test:nowarn -k test_print_to_correct_cell_from_child_thread || PYTHONTRACEMALLOC=20 hatch run test:nowarn --lf -k test_print_to_correct_cell_from_child_thread + PYTHONTRACEMALLOC=20 hatch run test:nowarn || PYTHONTRACEMALLOC=20 hatch run test:nowarn --lf - name: Run the tests on Windows timeout-minutes: 15 @@ -70,7 +70,7 @@ jobs: hatch run test:pip list hatch run test:python --version #hatch run test:python -m pip install git+https://github.com/ipython/ipython@92dd9e47fe8862ee38770744c165b680cb5241b1 - PYTHONTRACEMALLOC=20 hatch run test:pytest -v -k test_print_to_correct_cell_from_child_thread + PYTHONTRACEMALLOC=20 hatch run test:pytest -v - name: Check Launcher run: | diff --git a/tests/test_kernel.py b/tests/test_kernel.py index aea2408d..e27cb0d7 100644 --- a/tests/test_kernel.py +++ b/tests/test_kernel.py @@ -107,8 +107,8 @@ def child_target(): sleep({interval}) def parent_target(): - sleep({interval}) Thread(target=child_target).start() + sleep({interval * iterations}) Thread(target=parent_target).start() """ From e87ba37b22a7134085d605461d45fc749d15e7a6 Mon Sep 17 00:00:00 2001 From: David Brochart Date: Fri, 20 Dec 2024 15:40:04 +0100 Subject: [PATCH 25/26] - --- tests/test_io.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/test_io.py b/tests/test_io.py index 7b86a5d7..9615538f 100644 --- a/tests/test_io.py +++ b/tests/test_io.py @@ -35,8 +35,9 @@ async def iopub_thread(ctx): thread.start() yield thread - thread.stop() + thread.close() + thread.stop() except Exception: pass From f0818b688ca67e4fa104af0591b58bbe13b78c78 Mon Sep 17 00:00:00 2001 From: David Brochart Date: Fri, 20 Dec 2024 16:06:53 +0100 Subject: [PATCH 26/26] - --- tests/test_io.py | 14 ++++---------- 1 file changed, 4 insertions(+), 10 deletions(-) diff --git a/tests/test_io.py b/tests/test_io.py index 9615538f..aca2694e 100644 --- a/tests/test_io.py +++ b/tests/test_io.py @@ -29,17 +29,11 @@ def ctx(): @pytest.fixture() async def iopub_thread(ctx): - try: - async with zmq_anyio.Socket(ctx.socket(zmq.PUB)) as pub: - thread = IOPubThread(pub) - thread.start() - - yield thread + async with zmq_anyio.Socket(ctx.socket(zmq.PUB)) as pub: + thread = IOPubThread(pub) + thread.start() - thread.close() - thread.stop() - except Exception: - pass + yield thread async def test_io_api(iopub_thread):