From f1031ed3449a38d4652b24562bc4af4b2789a39c Mon Sep 17 00:00:00 2001 From: Sylvain Corlay Date: Fri, 11 Dec 2020 22:09:42 +0100 Subject: [PATCH] Nudge kernel with info request until we receive IOPub messages --- jupyter_server/gateway/managers.py | 2 +- jupyter_server/services/kernels/handlers.py | 78 +++++++++++++++++-- .../services/kernels/kernelmanager.py | 68 ++++++++++++---- 3 files changed, 125 insertions(+), 23 deletions(-) diff --git a/jupyter_server/gateway/managers.py b/jupyter_server/gateway/managers.py index 855c18aeaf..e4cb33ab41 100644 --- a/jupyter_server/gateway/managers.py +++ b/jupyter_server/gateway/managers.py @@ -442,7 +442,7 @@ async def shutdown_kernel(self, kernel_id, now=False, restart=False): self.log.debug("Shutdown kernel response: %d %s", response.code, response.reason) self.remove_kernel(kernel_id) - async def restart_kernel(self, kernel_id, now=False, **kwargs): + async def restart_kernel(self, kernel_id, channels=None, now=False, **kwargs): """Restart a kernel by its kernel uuid. Parameters diff --git a/jupyter_server/services/kernels/handlers.py b/jupyter_server/services/kernels/handlers.py index e581c30471..1db9a3c7d6 100644 --- a/jupyter_server/services/kernels/handlers.py +++ b/jupyter_server/services/kernels/handlers.py @@ -77,9 +77,8 @@ async def post(self, kernel_id, action): await ensure_async(km.interrupt_kernel(kernel_id)) self.set_status(204) if action == 'restart': - try: - await km.restart_kernel(kernel_id) + await km.restart_kernel(kernel_id, km.channels) except Exception as e: self.log.error("Exception restarting kernel", exc_info=True) self.set_status(500) @@ -127,6 +126,64 @@ def create_stream(self): self.channels[channel] = stream = meth(self.kernel_id, identity=identity) stream.channel = channel + shell_channel = self.channels['shell'] + iopub_channel = self.channels['iopub'] + + future = Future() + info_future = Future() + iopub_future = Future() + + def finish(): + """Common cleanup""" + loop.remove_timeout(timeout) + loop.remove_timeout(nudge_handle) + iopub_channel.stop_on_recv() + shell_channel.stop_on_recv() + + def on_shell_reply(msg): + if not info_future.done(): + self.log.debug("Nudge: shell info reply received: %s", self.kernel_id) + shell_channel.stop_on_recv() + self.log.debug("Nudge: resolving shell future") + info_future.set_result(msg) + if iopub_future.done(): + finish() + self.log.debug("Nudge: resolving main future in shell handler") + future.set_result(info_future.result()) + + def on_iopub(msg): + if not iopub_future.done(): + self.log.debug("Nudge: first IOPub received: %s", self.kernel_id) + iopub_channel.stop_on_recv() + self.log.debug("Nudge: resolving iopub future") + iopub_future.set_result(None) + if info_future.done(): + finish() + self.log.debug("Nudge: resolving main future in iopub handler") + future.set_result(info_future.result()) + + def on_timeout(): + self.log.warning("Nudge: Timeout waiting for kernel_info_reply: %s", self.kernel_id) + finish() + if not future.done(): + future.set_exception(TimeoutError("Timeout waiting for nudge")) + + iopub_channel.on_recv(on_iopub) + shell_channel.on_recv(on_shell_reply) + loop = IOLoop.current() + + # Nudge the kernel with kernel info requests until we get an IOPub message + def nudge(): + self.log.debug("Nudge") + if not future.done(): + self.log.debug("nudging") + self.session.send(shell_channel, "kernel_info_request") + nudge_handle = loop.call_later(0.5, nudge) + nudge_handle = loop.call_later(0, nudge) + + timeout = loop.add_timeout(loop.time() + self.kernel_info_timeout, on_timeout) + return future + def request_kernel_info(self): """send a request for kernel_info""" km = self.kernel_manager @@ -192,6 +249,7 @@ def initialize(self): super(ZMQChannelsHandler, self).initialize() self.zmq_stream = None self.channels = {} + self.kernel_manager.channels = self.channels self.kernel_id = None self.kernel_info_channel = None self._kernel_info_future = Future() @@ -249,7 +307,7 @@ async def _register_session(self): await stale_handler.close() self._open_sessions[self.session_key] = self - def open(self, kernel_id): + async def open(self, kernel_id): super(ZMQChannelsHandler, self).open() km = self.kernel_manager km.notify_connect(kernel_id) @@ -265,9 +323,11 @@ def open(self, kernel_id): for channel, msg_list in replay_buffer: stream = self.channels[channel] self._on_zmq_reply(stream, msg_list) + connected = Future() + connected.set_result(None) else: try: - self.create_stream() + connected = self.create_stream() except web.HTTPError as e: self.log.error("Error opening stream: %s", e) # WebSockets don't response to traditional error codes so we @@ -281,8 +341,14 @@ def open(self, kernel_id): km.add_restart_callback(self.kernel_id, self.on_kernel_restarted) km.add_restart_callback(self.kernel_id, self.on_restart_failed, 'dead') - for channel, stream in self.channels.items(): - stream.on_recv_stream(self._on_zmq_reply) + def subscribe(value): + for channel, stream in self.channels.items(): + stream.on_recv_stream(self._on_zmq_reply) + + connected.add_done_callback(subscribe) + + return connected + def on_message(self, msg): if not self.channels: diff --git a/jupyter_server/services/kernels/kernelmanager.py b/jupyter_server/services/kernels/kernelmanager.py index 3b12108b3a..4cc761baec 100644 --- a/jupyter_server/services/kernels/kernelmanager.py +++ b/jupyter_server/services/kernels/kernelmanager.py @@ -314,33 +314,59 @@ def shutdown_kernel(self, kernel_id, now=False, restart=False): return self.pinned_superclass.shutdown_kernel(self, kernel_id, now=now, restart=restart) - async def restart_kernel(self, kernel_id): + async def restart_kernel(self, kernel_id, channels): """Restart a kernel by kernel_id""" self._check_kernel_id(kernel_id) - await ensure_async(self.pinned_superclass.restart_kernel(self, kernel_id)) + await ensure_async(self.pinned_superclass.restart_kernel(self, kernel_id, channels)) kernel = self.get_kernel(kernel_id) # return a Future that will resolve when the kernel has successfully restarted - channel = kernel.connect_shell() + shell_channel = self.channels['shell'] + iopub_channel = self.channels['iopub'] + + session = Session( + config=kernel.session.config, + key=kernel.session.key, + ) + future = Future() + info_future = Future() + iopub_future = Future() def finish(): - """Common cleanup when restart finishes/fails for any reason.""" - if not channel.closed(): - channel.close() + """Common cleanup""" loop.remove_timeout(timeout) + loop.remove_timeout(nudge_handle) + iopub_channel.stop_on_recv() + shell_channel.stop_on_recv() kernel.remove_restart_callback(on_restart_failed, 'dead') - def on_reply(msg): - self.log.debug("Kernel info reply received: %s", kernel_id) - finish() - if not future.done(): - future.set_result(msg) + def on_shell_reply(msg): + if not info_future.done(): + self.log.debug("Nudge: shell info reply received: %s", self.kernel_id) + shell_channel.stop_on_recv() + self.log.debug("Nudge: resolving shell future") + info_future.set_result(msg) + if iopub_future.done(): + finish() + self.log.debug("Nudge: resolving main future in shell handler") + future.set_result(info_future.result()) + + def on_iopub(msg): + if not iopub_future.done(): + self.log.debug("Nudge: first IOPub received: %s", self.kernel_id) + iopub_channel.stop_on_recv() + self.log.debug("Nudge: resolving iopub future") + iopub_future.set_result(None) + if info_future.done(): + finish() + self.log.debug("Nudge: resolving main future in iopub handler") + future.set_result(info_future.result()) def on_timeout(): - self.log.warning("Timeout waiting for kernel_info_reply: %s", kernel_id) + self.log.warning("Nudge: Timeout waiting for kernel_info_reply: %s", self.kernel_id) finish() if not future.done(): - future.set_exception(TimeoutError("Timeout waiting for restart")) + future.set_exception(TimeoutError("Timeout waiting for nudge")) def on_restart_failed(): self.log.warning("Restarting kernel failed: %s", kernel_id) @@ -349,10 +375,20 @@ def on_restart_failed(): future.set_exception(RuntimeError("Restart failed")) kernel.add_restart_callback(on_restart_failed, 'dead') - kernel.session.send(channel, "kernel_info_request") - channel.on_recv(on_reply) + + iopub_channel.on_recv(on_iopub) + shell_channel.on_recv(on_shell_reply) loop = IOLoop.current() - timeout = loop.add_timeout(loop.time() + self.kernel_info_timeout, on_timeout) + + # Nudge the kernel with kernel info requests until we get an IOPub message + def nudge(): + self.log.debug("Nudge") + if not future.done(): + self.log.debug("nudging") + session.send(shell_channel, "kernel_info_request") + nudge_handle = loop.call_later(0.5, nudge) + nudge_handle = loop.call_later(0, nudge) + return future def notify_connect(self, kernel_id):