From 8f7349104eded221f3d87500b4a4177997248dde Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fr=C3=A9d=C3=A9ric=20Collonval?= Date: Tue, 14 May 2024 11:44:32 +0200 Subject: [PATCH 1/4] Use non-blocking zmq Poller --- jupyter_client/client.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/jupyter_client/client.py b/jupyter_client/client.py index aa353ac2..851a2345 100644 --- a/jupyter_client/client.py +++ b/jupyter_client/client.py @@ -530,7 +530,7 @@ async def _async_execute_interactive( else: timeout_ms = None - poller = zmq.Poller() + poller = zmq.asyncio.Poller() iopub_socket = self.iopub_channel.socket poller.register(iopub_socket, zmq.POLLIN) if allow_stdin: @@ -544,7 +544,7 @@ async def _async_execute_interactive( if timeout is not None: timeout = max(0, deadline - time.monotonic()) timeout_ms = int(1000 * timeout) - events = dict(poller.poll(timeout_ms)) + events = dict(await poller.poll(timeout_ms)) if not events: emsg = "Timeout waiting for output" raise TimeoutError(emsg) From 41479de0e9170a4a014fb1b6976fad9b809e0cd0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fr=C3=A9d=C3=A9ric=20Collonval?= Date: Tue, 14 May 2024 15:01:36 +0200 Subject: [PATCH 2/4] Pin pytest<8.2.0 --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index c2be8205..8a0ea4f4 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -51,7 +51,7 @@ test = [ "mypy", "paramiko; sys_platform == 'win32'", "pre-commit", - "pytest", + "pytest<8.2.0", "pytest-jupyter[client]>=0.4.1", "pytest-cov", "pytest-timeout", From a563aa6d6f2c45b5191799940987425fbbcd05ee Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fr=C3=A9d=C3=A9ric=20Collonval?= Date: Tue, 14 May 2024 15:49:52 +0200 Subject: [PATCH 3/4] Fix mypy --- jupyter_client/asynchronous/client.py | 2 +- jupyter_client/channels.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/jupyter_client/asynchronous/client.py b/jupyter_client/asynchronous/client.py index 11873416..cde8ecaf 100644 --- a/jupyter_client/asynchronous/client.py +++ b/jupyter_client/asynchronous/client.py @@ -33,7 +33,7 @@ class AsyncKernelClient(KernelClient): raising :exc:`queue.Empty` if no message arrives within ``timeout`` seconds. """ - context = Instance(zmq.asyncio.Context) + context = Instance(zmq.asyncio.Context) # type:ignore[arg-type] def _context_default(self) -> zmq.asyncio.Context: self._created_context = True diff --git a/jupyter_client/channels.py b/jupyter_client/channels.py index c645b134..ea1f4cbf 100644 --- a/jupyter_client/channels.py +++ b/jupyter_client/channels.py @@ -220,7 +220,7 @@ def _recv(self, **kwargs: t.Any) -> t.Dict[str, t.Any]: ident, smsg = self.session.feed_identities(msg) return self.session.deserialize(smsg) - def get_msg(self, timeout: t.Optional[float] = None) -> t.Dict[str, t.Any]: + def get_msg(self, timeout: t.Optional[int] = None) -> t.Dict[str, t.Any]: """Gets a message if there is one that is ready.""" assert self.socket is not None if timeout is not None: From 733bd59f4fad158d1adf1067293ed2afe84830ea Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fr=C3=A9d=C3=A9ric=20Collonval?= Date: Wed, 15 May 2024 10:28:13 +0200 Subject: [PATCH 4/4] Apply review comment --- jupyter_client/channels.py | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/jupyter_client/channels.py b/jupyter_client/channels.py index ea1f4cbf..465dccdf 100644 --- a/jupyter_client/channels.py +++ b/jupyter_client/channels.py @@ -220,12 +220,11 @@ def _recv(self, **kwargs: t.Any) -> t.Dict[str, t.Any]: ident, smsg = self.session.feed_identities(msg) return self.session.deserialize(smsg) - def get_msg(self, timeout: t.Optional[int] = None) -> t.Dict[str, t.Any]: + def get_msg(self, timeout: t.Optional[float] = None) -> t.Dict[str, t.Any]: """Gets a message if there is one that is ready.""" assert self.socket is not None - if timeout is not None: - timeout *= 1000 # seconds to ms - ready = self.socket.poll(timeout) + timeout_ms = None if timeout is None else int(timeout * 1000) # seconds to ms + ready = self.socket.poll(timeout_ms) if ready: res = self._recv() return res @@ -305,9 +304,8 @@ async def get_msg( # type:ignore[override] ) -> t.Dict[str, t.Any]: """Gets a message if there is one that is ready.""" assert self.socket is not None - if timeout is not None: - timeout *= 1000 # seconds to ms - ready = await self.socket.poll(timeout) + timeout_ms = None if timeout is None else int(timeout * 1000) # seconds to ms + ready = await self.socket.poll(timeout_ms) if ready: res = await self._recv() return res