Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prevent missing IOPub on restart #358

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion jupyter_server/services/kernels/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ def _handle_kernel_info_reply(self, msg):

enabling msg spec adaptation, if necessary
"""
idents,msg = self.session.feed_identities(msg)
idents, msg = self.session.feed_identities(msg)
try:
msg = self.session.deserialize(msg)
except:
Expand Down
44 changes: 35 additions & 9 deletions jupyter_server/services/kernels/kernelmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from datetime import datetime, timedelta
from functools import partial
import os
import time

from tornado import web
from tornado.concurrent import Future
Expand Down Expand Up @@ -320,37 +321,62 @@ async def restart_kernel(self, kernel_id):
await ensure_async(self.pinned_superclass.restart_kernel(self, kernel_id))
kernel = self.get_kernel(kernel_id)
# return a Future that will resolve when the kernel has successfully restarted
channel = kernel.connect_shell()
shell_channel = kernel.connect_shell()
iopub_channel = kernel.connect_iopub()
future = Future()
info_future = Future()
iopub_future = Future()

def finish():
"""Common cleanup when restart finishes/fails for any reason."""
if not channel.closed():
channel.close()
loop.remove_timeout(timeout)
kernel.remove_restart_callback(on_restart_failed, 'dead')

def on_reply(msg):
def on_shell_reply(msg):
self.log.debug("Kernel info reply received: %s", kernel_id)
finish()
if not future.done():
future.set_result(msg)
shell_channel.close()
if not info_future.done():
info_future.set_result(msg)
if iopub_future.done():
finish()
future.set_result(info_future.result())

def on_iopub(msg):
self.log.debug("first IOPub received: %s", kernel_id)
iopub_channel.close()
if not iopub_future.done():
iopub_future.set_result(None)
if info_future.done():
finish()
future.set_result(info_future.result())

def on_timeout():
self.log.warning("Timeout waiting for kernel_info_reply: %s", kernel_id)
if not shell_channel.closed():
shell_channel.close()
if not iopub_channel.closed():
iopub_channel.close()
finish()
if not future.done():
future.set_exception(TimeoutError("Timeout waiting for restart"))

def on_restart_failed():
self.log.warning("Restarting kernel failed: %s", kernel_id)
if not shell_channel.closed():
shell_channel.close()
if not iopub_channel.closed():
iopub_channel.close()
finish()
if not future.done():
future.set_exception(RuntimeError("Restart failed"))

kernel.add_restart_callback(on_restart_failed, 'dead')
kernel.session.send(channel, "kernel_info_request")
channel.on_recv(on_reply)
iopub_channel.on_recv(on_iopub)
shell_channel.on_recv(on_shell_reply)
while not future.done():
time.sleep(0.2)
# Nudge the kernel with kernel info requests until we get an IOPub message
kernel.session.send(shell_channel, "kernel_info_request")
loop = IOLoop.current()
timeout = loop.add_timeout(loop.time() + self.kernel_info_timeout, on_timeout)
return future
Expand Down