From aca54f716f1a51ec778f8f7813e62e8745dce69c Mon Sep 17 00:00:00 2001 From: Sylvain Corlay Date: Sat, 12 Dec 2020 10:52:15 +0100 Subject: [PATCH] Nudge kernel with info request until we receive IOPub messages --- jupyter_server/services/kernels/handlers.py | 90 ++++++++++++++++++--- 1 file changed, 81 insertions(+), 9 deletions(-) diff --git a/jupyter_server/services/kernels/handlers.py b/jupyter_server/services/kernels/handlers.py index e581c30471..ada458da95 100644 --- a/jupyter_server/services/kernels/handlers.py +++ b/jupyter_server/services/kernels/handlers.py @@ -127,6 +127,65 @@ def create_stream(self): self.channels[channel] = stream = meth(self.kernel_id, identity=identity) stream.channel = channel + def nudge(self): + shell_channel = self.channels['shell'] + iopub_channel = self.channels['iopub'] + + future = Future() + info_future = Future() + iopub_future = Future() + + def finish(): + """Common cleanup""" + loop.remove_timeout(timeout) + loop.remove_timeout(nudge_handle) + iopub_channel.stop_on_recv() + shell_channel.stop_on_recv() + + def on_shell_reply(msg): + if not info_future.done(): + self.log.debug("Nudge: shell info reply received: %s", self.kernel_id) + shell_channel.stop_on_recv() + self.log.debug("Nudge: resolving shell future") + info_future.set_result(msg) + if iopub_future.done(): + finish() + self.log.debug("Nudge: resolving main future in shell handler") + future.set_result(info_future.result()) + + def on_iopub(msg): + if not iopub_future.done(): + self.log.debug("Nudge: first IOPub received: %s", self.kernel_id) + iopub_channel.stop_on_recv() + self.log.debug("Nudge: resolving iopub future") + iopub_future.set_result(None) + if info_future.done(): + finish() + self.log.debug("Nudge: resolving main future in iopub handler") + future.set_result(info_future.result()) + + def on_timeout(): + self.log.warning("Nudge: Timeout waiting for kernel_info_reply: %s", self.kernel_id) + finish() + if not future.done(): + future.set_exception(TimeoutError("Timeout waiting for nudge")) + + iopub_channel.on_recv(on_iopub) + shell_channel.on_recv(on_shell_reply) + loop = IOLoop.current() + + # Nudge the kernel with kernel info requests until we get an IOPub message + def nudge(): + self.log.debug("Nudge") + if not future.done(): + self.log.debug("nudging") + self.session.send(shell_channel, "kernel_info_request") + nudge_handle = loop.call_later(0.5, nudge) + nudge_handle = loop.call_later(0, nudge) + + timeout = loop.add_timeout(loop.time() + self.kernel_info_timeout, on_timeout) + return future + def request_kernel_info(self): """send a request for kernel_info""" km = self.kernel_manager @@ -249,7 +308,7 @@ async def _register_session(self): await stale_handler.close() self._open_sessions[self.session_key] = self - def open(self, kernel_id): + async def open(self, kernel_id): super(ZMQChannelsHandler, self).open() km = self.kernel_manager km.notify_connect(kernel_id) @@ -259,15 +318,22 @@ def open(self, kernel_id): if buffer_info and buffer_info['session_key'] == self.session_key: self.log.info("Restoring connection for %s", self.session_key) self.channels = buffer_info['channels'] - replay_buffer = buffer_info['buffer'] - if replay_buffer: - self.log.info("Replaying %s buffered messages", len(replay_buffer)) - for channel, msg_list in replay_buffer: - stream = self.channels[channel] - self._on_zmq_reply(stream, msg_list) + + connected = self.nudge() + + def replay(value): + replay_buffer = buffer_info['buffer'] + if replay_buffer: + self.log.info("Replaying %s buffered messages", len(replay_buffer)) + for channel, msg_list in replay_buffer: + stream = self.channels[channel] + self._on_zmq_reply(stream, msg_list) + + connected.add_done_callback(replay) else: try: self.create_stream() + connected = self.nudge() except web.HTTPError as e: self.log.error("Error opening stream: %s", e) # WebSockets don't response to traditional error codes so we @@ -281,8 +347,14 @@ def open(self, kernel_id): km.add_restart_callback(self.kernel_id, self.on_kernel_restarted) km.add_restart_callback(self.kernel_id, self.on_restart_failed, 'dead') - for channel, stream in self.channels.items(): - stream.on_recv_stream(self._on_zmq_reply) + def subscribe(value): + for channel, stream in self.channels.items(): + stream.on_recv_stream(self._on_zmq_reply) + + connected.add_done_callback(subscribe) + + return connected + def on_message(self, msg): if not self.channels: