diff --git a/jupyter_server/services/kernels/handlers.py b/jupyter_server/services/kernels/handlers.py index e581c30471..1b23edb228 100644 --- a/jupyter_server/services/kernels/handlers.py +++ b/jupyter_server/services/kernels/handlers.py @@ -155,7 +155,7 @@ def _handle_kernel_info_reply(self, msg): enabling msg spec adaptation, if necessary """ - idents,msg = self.session.feed_identities(msg) + idents, msg = self.session.feed_identities(msg) try: msg = self.session.deserialize(msg) except: diff --git a/jupyter_server/services/kernels/kernelmanager.py b/jupyter_server/services/kernels/kernelmanager.py index 3b12108b3a..3071fd7a7a 100644 --- a/jupyter_server/services/kernels/kernelmanager.py +++ b/jupyter_server/services/kernels/kernelmanager.py @@ -11,6 +11,7 @@ from datetime import datetime, timedelta from functools import partial import os +import time from tornado import web from tornado.concurrent import Future @@ -320,37 +321,62 @@ async def restart_kernel(self, kernel_id): await ensure_async(self.pinned_superclass.restart_kernel(self, kernel_id)) kernel = self.get_kernel(kernel_id) # return a Future that will resolve when the kernel has successfully restarted - channel = kernel.connect_shell() + shell_channel = kernel.connect_shell() + iopub_channel = kernel.connect_iopub() future = Future() + info_future = Future() + iopub_future = Future() def finish(): """Common cleanup when restart finishes/fails for any reason.""" - if not channel.closed(): - channel.close() loop.remove_timeout(timeout) kernel.remove_restart_callback(on_restart_failed, 'dead') - def on_reply(msg): + def on_shell_reply(msg): self.log.debug("Kernel info reply received: %s", kernel_id) - finish() - if not future.done(): - future.set_result(msg) + shell_channel.close() + if not info_future.done(): + info_future.set_result(msg) + if iopub_future.done(): + finish() + future.set_result(info_future.result()) + + def on_iopub(msg): + self.log.debug("first IOPub received: %s", kernel_id) + iopub_channel.close() + if not iopub_future.done(): + iopub_future.set_result(None) + if info_future.done(): + finish() + future.set_result(info_future.result()) def on_timeout(): self.log.warning("Timeout waiting for kernel_info_reply: %s", kernel_id) + if not shell_channel.closed(): + shell_channel.close() + if not iopub_channel.closed(): + iopub_channel.close() finish() if not future.done(): future.set_exception(TimeoutError("Timeout waiting for restart")) def on_restart_failed(): self.log.warning("Restarting kernel failed: %s", kernel_id) + if not shell_channel.closed(): + shell_channel.close() + if not iopub_channel.closed(): + iopub_channel.close() finish() if not future.done(): future.set_exception(RuntimeError("Restart failed")) kernel.add_restart_callback(on_restart_failed, 'dead') - kernel.session.send(channel, "kernel_info_request") - channel.on_recv(on_reply) + iopub_channel.on_recv(on_iopub) + shell_channel.on_recv(on_shell_reply) + while not future.done(): + time.sleep(0.2) + # Nudge the kernel with kernel info requests until we get an IOPub message + kernel.session.send(shell_channel, "kernel_info_request") loop = IOLoop.current() timeout = loop.add_timeout(loop.time() + self.kernel_info_timeout, on_timeout) return future