Skip to content

Commit

Permalink
Rework wait_for_ready logic
Browse files Browse the repository at this point in the history
  • Loading branch information
SylvainCorlay committed Dec 10, 2020
1 parent 996b53e commit 06a32c4
Show file tree
Hide file tree
Showing 5 changed files with 45 additions and 24 deletions.
11 changes: 9 additions & 2 deletions jupyter_client/asynchronous/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,14 +126,21 @@ async def wait_for_ready(self, timeout=None):

# Wait for kernel info reply on shell channel
while True:
self.kernel_info()
try:
msg = await self.shell_channel.get_msg(timeout=1)
except Empty:
pass
else:
if msg['msg_type'] == 'kernel_info_reply':
self._handle_kernel_info_reply(msg)
break
# Checking that IOPub is connected. If it is not connected, start over.
try:
await self.iopub_channel.get_msg(timeout=0.2)
except Empty:
pass
else:
self._handle_kernel_info_reply(msg)
break

if not await self.is_alive():
raise RuntimeError('Kernel died before replying to kernel_info')
Expand Down
11 changes: 9 additions & 2 deletions jupyter_client/blocking/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,14 +91,21 @@ def wait_for_ready(self, timeout=None):

# Wait for kernel info reply on shell channel
while True:
self.kernel_info()
try:
msg = self.shell_channel.get_msg(block=True, timeout=1)
except Empty:
pass
else:
if msg['msg_type'] == 'kernel_info_reply':
self._handle_kernel_info_reply(msg)
break
# Checking that IOPub is connected. If it is not connected, start over.
try:
self.iopub_channel.get_msg(block=True, timeout=0.2)
except Empty:
pass
else:
self._handle_kernel_info_reply(msg)
break

if not self.is_alive():
raise RuntimeError('Kernel died before replying to kernel_info')
Expand Down
5 changes: 2 additions & 3 deletions jupyter_client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,11 +102,10 @@ def start_channels(self, shell=True, iopub=True, stdin=True, hb=True, control=Tr
:meth:`start_kernel`. If the channels have been stopped and you
call this, :class:`RuntimeError` will be raised.
"""
if shell:
self.shell_channel.start()
self.kernel_info()
if iopub:
self.iopub_channel.start()
if shell:
self.shell_channel.start()
if stdin:
self.stdin_channel.start()
self.allow_stdin = True
Expand Down
7 changes: 0 additions & 7 deletions jupyter_client/tests/signalkernel.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,6 @@ def do_execute(self, code, silent, store_history=True, user_expressions=None,
reply['traceback'] = ['no such command: %s' % code]
return reply

def kernel_info_request(self, *args, **kwargs):
"""Add delay to kernel_info_request
triggers slow-response code in KernelClient.wait_for_ready
"""
return super().kernel_info_request(*args, **kwargs)


class SignalTestApp(IPKernelApp):
kernel_class = SignalTestKernel
Expand Down
35 changes: 25 additions & 10 deletions jupyter_client/tests/test_kernelmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,11 @@ def test_signal_kernel_subprocesses(self, install_kernel, start_kernel):
km, kc = start_kernel

def execute(cmd):
kc.execute(cmd)
reply = kc.get_shell_msg(TIMEOUT)
request_id = kc.execute(cmd)
while True:
reply = kc.get_shell_msg(TIMEOUT)
if reply['parent_header']['msg_id'] == request_id:
break
content = reply['content']
assert content['status'] == 'ok'
return content
Expand Down Expand Up @@ -172,8 +175,11 @@ def test_start_new_kernel(self, install_kernel, start_kernel):

def _env_test_body(self, kc):
def execute(cmd):
kc.execute(cmd)
reply = kc.get_shell_msg(TIMEOUT)
request_id = kc.execute(cmd)
while True:
reply = kc.get_shell_msg(TIMEOUT)
if reply['parent_header']['msg_id'] == request_id:
break
content = reply['content']
assert content['status'] == 'ok'
return content
Expand Down Expand Up @@ -274,8 +280,11 @@ def _run_signaltest_lifecycle(self, config=None):
kc = self._prepare_kernel(km, stdout=PIPE, stderr=PIPE)

def execute(cmd):
kc.execute(cmd)
reply = kc.get_shell_msg(TIMEOUT)
request_id = kc.execute(cmd)
while True:
reply = kc.get_shell_msg(TIMEOUT)
if reply['parent_header']['msg_id'] == request_id:
break
content = reply['content']
assert content['status'] == 'ok'
return content
Expand Down Expand Up @@ -344,8 +353,11 @@ async def test_signal_kernel_subprocesses(self, install_kernel, start_async_kern
km, kc = start_async_kernel

async def execute(cmd):
kc.execute(cmd)
reply = await kc.get_shell_msg(TIMEOUT)
request_id = kc.execute(cmd)
while True:
reply = await kc.get_shell_msg(TIMEOUT)
if reply['parent_header']['msg_id'] == request_id:
break
content = reply['content']
assert content['status'] == 'ok'
return content
Expand All @@ -360,10 +372,13 @@ async def execute(cmd):
assert reply['user_expressions']['poll'] == [None] * N

# start a job on the kernel to be interrupted
kc.execute('sleep')
request_id = kc.execute('sleep')
await asyncio.sleep(1) # ensure sleep message has been handled before we interrupt
await km.interrupt_kernel()
reply = await kc.get_shell_msg(TIMEOUT)
while True:
reply = await kc.get_shell_msg(TIMEOUT)
if reply['parent_header']['msg_id'] == request_id:
break
content = reply['content']
assert content['status'] == 'ok'
assert content['user_expressions']['interrupted'] is True
Expand Down

0 comments on commit 06a32c4

Please sign in to comment.