Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle Iopub welcome message for jep#65 #1303

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 73 additions & 2 deletions jupyter_server/services/kernels/connection/channels.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,20 @@ def create_stream(self):
self.channels[channel] = stream = meth(identity=identity)
stream.channel = channel

def _is_iopub_welcome_supported(self):
"""Check if the messaging protocol supports sending
Iopub welcome messages (i.e protocol_version >= 5.4)
"""
MIN_MAJOR_VERSION = 5
MIN_MINOR_VERSION = 4
protocol_version_parts = client_protocol_version.split(".")
if int(protocol_version_parts[0]) > MIN_MAJOR_VERSION:
return True
return bool(
int(protocol_version_parts[0]) == MIN_MAJOR_VERSION
and int(protocol_version_parts[1]) >= MIN_MINOR_VERSION
)

def nudge(self): # noqa
"""Nudge the zmq connections with kernel_info_requests
Returns a Future that will resolve when we have received
Expand Down Expand Up @@ -268,6 +282,63 @@ def nudge(count):
future.add_done_callback(finish)
return _ensure_future(future)

def wait_for_iopub_welcome(self):
"""Wait for an iopub welcome message
Since protocol_version >= 5.4
"""
iopub_channel = self.channels["iopub"]
iopub_future: Future = Future()

def on_iopub(msg):
"""Handle iopub welcome message"""
idents, msg = self.session.feed_identities(msg)
try:
msg = self.session.deserialize(msg)
except BaseException:
self.log.error("Bad Iopub message", exc_info=True)
iopub_channel.stop_on_recv()
iopub_future.set_result(None)
else:
msg_type = msg["header"]["msg_type"]
if msg_type == "iopub_welcome":
self.log.debug("Iopub welcome message received: %s", self.kernel_id)
iopub_channel.stop_on_recv()
iopub_future.set_result(msg)
else:
self.log.error(
"Iopub welcome message not received, receiving %s instead", msg_type
)
iopub_channel.stop_on_recv()
iopub_future.set_result(None)

iopub_channel.on_recv(on_iopub)
loop = IOLoop.current()

def give_up():
"""Don't wait forever for the kernel"""
if iopub_future.done():
return
self.log.error("Timeout waiting for IOPub welcome message from %s", self.kernel_id)
iopub_channel.stop_on_recv()
iopub_future.set_result(None)

# Resolve with a timeout if we get no response
timeout_future = gen.with_timeout(loop.time() + self.kernel_info_timeout, iopub_future)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if we should use kernel_info_timeout or something else for the timeout?

timeout_future.add_done_callback(give_up)
return _ensure_future(timeout_future)

def is_subscribed(self):
"""Check zmq subscriptions depending on the protocol version.
Either with iopub welcome message if supported, or with `nudge`
"""
if self._is_iopub_welcome_supported():
return self.wait_for_iopub_welcome()
else:
self.log.warning(
"Be aware that using an old kernel protocol may involve possible loss of early iopub messages"
)
return self.nudge()

async def _register_session(self):
"""Ensure we aren't creating a duplicate session.

Expand Down Expand Up @@ -342,7 +413,7 @@ def connect(self):
# The kernel's ports have not changed; use the channels captured in the buffer
self.channels = buffer_info["channels"]

connected = self.nudge()
connected = self.is_subscribed()

def replay(value):
replay_buffer = buffer_info["buffer"]
Expand All @@ -356,7 +427,7 @@ def replay(value):
else:
try:
self.create_stream()
connected = self.nudge()
connected = self.is_subscribed()
except web.HTTPError as e:
# Do not log error if the kernel is already shutdown,
# as it's normal that it's not responding
Expand Down