diff --git a/notebook/services/kernels/handlers.py b/notebook/services/kernels/handlers.py index 42b2f76a92..c2a8946bfb 100644 --- a/notebook/services/kernels/handlers.py +++ b/notebook/services/kernels/handlers.py @@ -78,9 +78,8 @@ def post(self, kernel_id, action): yield maybe_future(km.interrupt_kernel(kernel_id)) self.set_status(204) if action == 'restart': - try: - yield maybe_future(km.restart_kernel(kernel_id)) + yield maybe_future(km.restart_kernel(kernel_id, channels)) except Exception as e: self.log.error("Exception restarting kernel", exc_info=True) self.set_status(500) diff --git a/notebook/services/kernels/kernelmanager.py b/notebook/services/kernels/kernelmanager.py index 61cbbe58f5..2ebcd297ee 100644 --- a/notebook/services/kernels/kernelmanager.py +++ b/notebook/services/kernels/kernelmanager.py @@ -304,33 +304,54 @@ def shutdown_kernel(self, kernel_id, now=False, restart=False): return self.pinned_superclass.shutdown_kernel(self, kernel_id, now=now, restart=restart) - async def restart_kernel(self, kernel_id, now=False): + async def restart_kernel(self, kernel_id, channels, now=False): """Restart a kernel by kernel_id""" self._check_kernel_id(kernel_id) await maybe_future(self.pinned_superclass.restart_kernel(self, kernel_id, now=now)) kernel = self.get_kernel(kernel_id) # return a Future that will resolve when the kernel has successfully restarted - channel = kernel.connect_shell() + shell_channel = self.channels['shell'] + iopub_channel = self.channels['iopub'] + future = Future() + info_future = Future() + iopub_future = Future() def finish(): - """Common cleanup when restart finishes/fails for any reason.""" - if not channel.closed(): - channel.close() + """Common cleanup""" loop.remove_timeout(timeout) + loop.remove_timeout(nudge_handle) + iopub_channel.stop_on_recv() + shell_channel.stop_on_recv() kernel.remove_restart_callback(on_restart_failed, 'dead') - def on_reply(msg): - self.log.debug("Kernel info reply received: %s", kernel_id) - finish() - if not future.done(): - future.set_result(msg) + def on_shell_reply(msg): + if not info_future.done(): + self.log.debug("Nudge: shell info reply received: %s", self.kernel_id) + shell_channel.stop_on_recv() + self.log.debug("Nudge: resolving shell future") + info_future.set_result(msg) + if iopub_future.done(): + finish() + self.log.debug("Nudge: resolving main future in shell handler") + future.set_result(info_future.result()) + + def on_iopub(msg): + if not iopub_future.done(): + self.log.debug("Nudge: first IOPub received: %s", self.kernel_id) + iopub_channel.stop_on_recv() + self.log.debug("Nudge: resolving iopub future") + iopub_future.set_result(None) + if info_future.done(): + finish() + self.log.debug("Nudge: resolving main future in iopub handler") + future.set_result(info_future.result()) def on_timeout(): - self.log.warning("Timeout waiting for kernel_info_reply: %s", kernel_id) + self.log.warning("Nudge: Timeout waiting for kernel_info_reply: %s", self.kernel_id) finish() if not future.done(): - future.set_exception(TimeoutError("Timeout waiting for restart")) + future.set_exception(TimeoutError("Timeout waiting for nudge")) def on_restart_failed(): self.log.warning("Restarting kernel failed: %s", kernel_id) @@ -339,10 +360,20 @@ def on_restart_failed(): future.set_exception(RuntimeError("Restart failed")) kernel.add_restart_callback(on_restart_failed, 'dead') - kernel.session.send(channel, "kernel_info_request") - channel.on_recv(on_reply) + + iopub_channel.on_recv(on_iopub) + shell_channel.on_recv(on_shell_reply) loop = IOLoop.current() - timeout = loop.add_timeout(loop.time() + self.kernel_info_timeout, on_timeout) + + # Nudge the kernel with kernel info requests until we get an IOPub message + def nudge(): + self.log.debug("Nudge") + if not future.done(): + self.log.debug("nudging") + self.session.send(shell_channel, "kernel_info_request") + nudge_handle = loop.call_later(0.5, nudge) + nudge_handle = loop.call_later(0, nudge) + return future def notify_connect(self, kernel_id):