Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve startup, error handling and shutdown of Jupyter kernels #4364

Merged
merged 1 commit into from
Jan 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 17 additions & 7 deletions panel/io/jupyter_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
from bokeh.server.views.ws import WSHandler
from bokeh.util.token import get_session_id, get_token_payload
from ipykernel.comm import Comm
from IPython.display import HTML, publish_display_data

from ..util import edit_readonly
from .resources import Resources
Expand All @@ -38,6 +37,14 @@ class _RequestProxy:
cookies: Dict[str, str]
headers: Dict[str, str | List[str]]

class Mimebundle:

def __init__(self, mimebundle):
self._mimebundle = mimebundle

def _repr_mimebundle_(self, include=None, exclude=None):
return self._mimebundle, {}


class PanelExecutor(WSHandler):
"""
Expand Down Expand Up @@ -66,8 +73,11 @@ def __init__(self, path, token, root_url):
path_versioner=StaticHandler.append_version
)
self._set_state()
self.session = self._create_server_session()
self.connection = ServerConnection(self.protocol, self, None, self.session)
try:
self.session = self._create_server_session()
self.connection = ServerConnection(self.protocol, self, None, self.session)
except Exception:
self.session = None

def _get_payload(self, token: str) -> Dict[str, Any]:
payload = get_token_payload(token)
Expand Down Expand Up @@ -145,7 +155,6 @@ def _create_server_session(self) -> ServerSession:
session_context._set_session(session)
return session


async def write_message(
self, message: Union[bytes, str, Dict[str, Any]],
binary: bool = False, locked: bool = True
Expand All @@ -156,11 +165,13 @@ async def write_message(
else:
self.comm.send(message, metadata=metadata)

def render(self) -> HTML:
def render(self) -> Mimebundle:
"""
Renders the application to an IPython.display.HTML object to
be served by the `PanelJupyterHandler`.
"""
if self.session is None:
return Mimebundle({'text/error': 'Session did not start correctly'})
with set_curdoc(self.session.document):
html = server_html_page_for_session(
self.session,
Expand All @@ -169,5 +180,4 @@ def render(self) -> HTML:
template=self.session.document.template,
template_variables=self.session.document.template_variables
)
publish_display_data({'application/bokeh-extensions': extension_dirs})
return HTML(html)
return Mimebundle({'text/html': html, 'application/bokeh-extensions': extension_dirs})
20 changes: 12 additions & 8 deletions panel/io/jupyter_server_extension.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,17 +169,21 @@ async def _get_info(self, msg_id, timeout=KERNEL_TIMEOUT):
msg = await ensure_async(self.kernel.iopub_channel.get_msg(timeout=None))
except Empty:
if not await ensure_async(self.kernel.is_alive()):
raise RuntimeError("Kernel died before establishing Comm connection to Panel app.")
raise RuntimeError("Kernel died before establishing Comm connection to Panel application.")
continue
if msg['parent_header'].get('msg_id') != msg_id:
continue
msg_type = msg['header']['msg_type']
if msg_type == 'display_data' and 'application/bokeh-extensions' in msg['content']['data']:
extension_dirs = msg['content']['data']['application/bokeh-extensions']
elif msg_type == 'execute_result':
result = msg
if msg_type == 'execute_result':
data = msg['content']['data']
if 'text/error' in data:
raise RuntimeError("Panel application errored during startup.")
extension_dirs = data['application/bokeh-extensions']
result = data['text/html']
elif msg_type == 'comm_open' and msg['content']['target_name'] == self.session_id:
comm_id = msg['content']['comm_id']
elif msg_type == 'stream' and msg['content']['name'] == 'stderr':
logger.info(msg['content']['text'])
return result, comm_id, extension_dirs

@tornado.web.authenticated
Expand Down Expand Up @@ -262,7 +266,7 @@ async def get(self, path=None):

# Wait for comm to open and rendered HTML to be returned by the kernel
try:
msg, comm_id, ext_dirs = await self._get_info(msg_id)
html, comm_id, ext_dirs = await self._get_info(msg_id)
except (TimeoutError, RuntimeError) as e:
await self.kernel_manager.shutdown_kernel(kernel_id, now=True)
html = ERROR_TEMPLATE.render(
Expand All @@ -280,7 +284,6 @@ async def get(self, path=None):
state._kernels[self.session_id] = (self.kernel, comm_id, kernel_id, False)
loop = tornado.ioloop.IOLoop.current()
loop.call_later(CONNECTION_TIMEOUT, self._check_connected)
html = msg['content']['data']['text/html']
self.finish(html)

async def _check_connected(self):
Expand All @@ -291,7 +294,6 @@ async def _check_connected(self):
await self.kernel_manager.shutdown_kernel(kernel_id, now=True)



class PanelWSProxy(WSHandler, JupyterHandler):
"""
The PanelWSProxy serves as a proxy between the frontend and the
Expand Down Expand Up @@ -414,7 +416,9 @@ def on_close(self) -> None:
if self.session_id in state._kernels:
del state._kernels[self.session_id]
self._ping_job.stop()
reply = self.kernel.shutdown(reply=True)
future = self.kernel_manager.shutdown_kernel(self.kernel_id, now=True)
asyncio.ensure_future(reply)
asyncio.ensure_future(future)
self.kernel = None

Expand Down