diff --git a/notebook/services/kernels/handlers.py b/notebook/services/kernels/handlers.py index 73da737b150..42b2f76a926 100644 --- a/notebook/services/kernels/handlers.py +++ b/notebook/services/kernels/handlers.py @@ -121,12 +121,71 @@ def __repr__(self): return "%s(%s)" % (self.__class__.__name__, getattr(self, 'kernel_id', 'uninitialized')) def create_stream(self): + self.log.debug("Create stream") km = self.kernel_manager identity = self.session.bsession for channel in ('shell', 'control', 'iopub', 'stdin'): meth = getattr(km, 'connect_' + channel) self.channels[channel] = stream = meth(self.kernel_id, identity=identity) stream.channel = channel + + shell_channel = self.channels['shell'] + iopub_channel = self.channels['iopub'] + + future = Future() + info_future = Future() + iopub_future = Future() + + def finish(): + """Common cleanup""" + loop.remove_timeout(timeout) + loop.remove_timeout(nudge_handle) + iopub_channel.stop_on_recv() + shell_channel.stop_on_recv() + + def on_shell_reply(msg): + if not info_future.done(): + self.log.debug("Nudge: shell info reply received: %s", self.kernel_id) + shell_channel.stop_on_recv() + self.log.debug("Nudge: resolving shell future") + info_future.set_result(msg) + if iopub_future.done(): + finish() + self.log.debug("Nudge: resolving main future in shell handler") + future.set_result(info_future.result()) + + def on_iopub(msg): + if not iopub_future.done(): + self.log.debug("Nudge: first IOPub received: %s", self.kernel_id) + iopub_channel.stop_on_recv() + self.log.debug("Nudge: resolving iopub future") + iopub_future.set_result(None) + if info_future.done(): + finish() + self.log.debug("Nudge: resolving main future in iopub handler") + future.set_result(info_future.result()) + + def on_timeout(): + self.log.warning("Nudge: Timeout waiting for kernel_info_reply: %s", self.kernel_id) + finish() + if not future.done(): + future.set_exception(TimeoutError("Timeout waiting for nudge")) + + iopub_channel.on_recv(on_iopub) + shell_channel.on_recv(on_shell_reply) + loop = IOLoop.current() + + # Nudge the kernel with kernel info requests until we get an IOPub message + def nudge(): + self.log.debug("Nudge") + if not future.done(): + self.log.debug("nudging") + self.session.send(shell_channel, "kernel_info_request") + nudge_handle = loop.call_later(0.5, nudge) + nudge_handle = loop.call_later(0, nudge) + + timeout = loop.add_timeout(loop.time() + self.kernel_info_timeout, on_timeout) + return future def request_kernel_info(self): """send a request for kernel_info""" @@ -253,6 +312,7 @@ def _register_session(self): yield stale_handler.close() self._open_sessions[self.session_key] = self + @gen.coroutine def open(self, kernel_id): super().open() km = self.kernel_manager @@ -269,9 +329,11 @@ def open(self, kernel_id): for channel, msg_list in replay_buffer: stream = self.channels[channel] self._on_zmq_reply(stream, msg_list) + connected = Future() + connected.set_result(None) else: try: - self.create_stream() + connected = self.create_stream() except web.HTTPError as e: self.log.error("Error opening stream: %s", e) # WebSockets don't response to traditional error codes so we @@ -285,8 +347,13 @@ def open(self, kernel_id): km.add_restart_callback(self.kernel_id, self.on_kernel_restarted) km.add_restart_callback(self.kernel_id, self.on_restart_failed, 'dead') - for channel, stream in self.channels.items(): - stream.on_recv_stream(self._on_zmq_reply) + def subscribe(value): + for channel, stream in self.channels.items(): + stream.on_recv_stream(self._on_zmq_reply) + + connected.add_done_callback(subscribe) + + return connected def on_message(self, msg): if not self.channels: