Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve the busy/idle execution state tracking for kernels. #1429

Merged
merged 22 commits into from
Jul 18, 2024
Merged
Show file tree
Hide file tree
Changes from 20 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
67255a9
Fix the activity and execution-state tracking for kernels.
ojarjur Nov 17, 2023
30b45db
Improve kernel status tracking by matching idle status messages again…
ojarjur Nov 17, 2023
d2a952d
Merge branch 'main' of https://github.com/jupyter-server/jupyter_serv…
ojarjur Jun 4, 2024
4ce5539
Test fixes
ojarjur Jun 5, 2024
fadf5e8
Reduce the diff against the current code
ojarjur Jun 5, 2024
bdb9814
Add a test for execute_state
ojarjur Jun 5, 2024
f37557c
Fix lint warnings
ojarjur Jun 5, 2024
f1eb5a6
Fix race conditions and deadlocks in the test_execution_state tests
ojarjur Jun 6, 2024
c870f7e
Remove sleep that caused test failures on some jobs
ojarjur Jun 6, 2024
50c5f9c
Revert unexpected behavior change that affected tests on Windows
ojarjur Jun 6, 2024
982cb1e
Restore accidentally deleted pydoc
ojarjur Jun 6, 2024
4921682
Reduce the diff against the main branch and drop the unneeded, in-mem…
ojarjur Jun 6, 2024
6257ca1
Respect status messages that explicitly report a status of "starting"
ojarjur Jun 6, 2024
170cde0
Fix flakiness in thekernel test_execution_state test
ojarjur Jun 6, 2024
e583410
Make the kernel test_execution_state test more reliable by increasing…
ojarjur Jun 6, 2024
3ecc26d
Make kernel execution state test more reliable
ojarjur Jun 6, 2024
bcc2f1e
Make the test/test_utils.py test pass on Windows
ojarjur Jun 6, 2024
300008e
Fix a race condition in setting the initial kernel execution state
ojarjur Jun 7, 2024
101ebed
Simplify the retry logic for the kernel execution state test... inste…
ojarjur Jun 7, 2024
0e8bd97
Merge branch 'main' into ojarjur/fix-kernel-status
ojarjur Jul 10, 2024
54ea903
Switch from having a list of tracked message types for user activity …
ojarjur Jul 12, 2024
1b3ea06
Re-introduce retries in the execution status test to further reduce f…
ojarjur Jul 12, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 40 additions & 8 deletions jupyter_server/services/kernels/kernelmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -232,11 +232,7 @@ async def _async_start_kernel( # type:ignore[override]
kwargs["kernel_id"] = kernel_id
kernel_id = await self.pinned_superclass._async_start_kernel(self, **kwargs)
self._kernel_connections[kernel_id] = 0
task = asyncio.create_task(self._finish_kernel_start(kernel_id))
if not getattr(self, "use_pending_kernels", None):
await task
else:
self._pending_kernel_tasks[kernel_id] = task

# add busy/activity markers:
kernel = self.get_kernel(kernel_id)
kernel.execution_state = "starting" # type:ignore[attr-defined]
Expand All @@ -250,6 +246,12 @@ async def _async_start_kernel( # type:ignore[override]
if env and isinstance(env, dict): # type:ignore[unreachable]
self.log.debug("Kernel argument 'env' passed with: %r", list(env.keys())) # type:ignore[unreachable]

task = asyncio.create_task(self._finish_kernel_start(kernel_id))
if not getattr(self, "use_pending_kernels", None):
await task
else:
self._pending_kernel_tasks[kernel_id] = task

# Increase the metric of number of kernels running
# for the relevant kernel type by 1
KERNEL_CURRENTLY_RUNNING_TOTAL.labels(type=self._kernels[kernel_id].kernel_name).inc()
Expand Down Expand Up @@ -537,6 +539,24 @@ def _check_kernel_id(self, kernel_id):
raise web.HTTPError(404, "Kernel does not exist: %s" % kernel_id)

# monitoring activity:
tracked_message_types = List(
trait=Unicode(),
config=True,
default_value=[
"comm_close",
"comm_msg",
"comm_open",
"complete_request",
"is_complete_request",
"execute_input",
"execute_reply",
"execute_request",
"inspect_request",

This comment was marked as resolved.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see it was also discussed in depth in #1360 (comment). I am not sure if there was a resolution though.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure if there was a resolution though.

We didn't come to a consensus on the set of message types to include, but I think we did get a pretty clear consensus that the set of message types should be configurable so that server admins can override whatever default we choose.

I don't know if we can get a consensus on the set of types to leave in the default value for the config, but if we do get a consensus that something should be added to (or removed from) this list, then please let me know.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As discussed on the call today, it might be more conservative to change this list to a not_tracked_message_types, as it would be less risk of accidentally forgetting that a certain message type should be counted as kernel activity. It would also mean that the risk changes from "culling a kernel that shouldn't be culled" (bad because of potential data loss) to "not culling a kernel that can be culled" (bad because of potential cost / resource use), where the former seems more disruptive.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've put together a tentative list of what I think would be the default set of not-tracked message types.

I got this by going through this page, and selecting all of the message types that were either:

  1. Not sent on the shell channel, or
  2. Of the form *_info_(request|reply).

This is almost disjoint from the proposed list for tracked message types. The difference is that the execute_input message that was going to be tracked is included here in the not-tracked list because it is sent on the IOPub channel instead of the shell channel.

The full list is:

comm_info_request,
comm_info_reply,
kernel_info_request,
kernel_info_reply,
shutdown_request,
shutdown_reply,
interrupt_request,
interrupt_reply,
debug_request,
debug_reply,
stream,
display_data,
update_display_data,
execute_input,
execute_result,
error,
status,
clear_output,
debug_event,
input_request,
input_reply

@krassowski @vidartf @Zsailer Can you all please help me double check this?

In particular:

  1. Is my methodology described above the right approach to take?
  2. Does the list I came up with look right?

Thanks, in advance!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @ojarjur. I'll give this a closer look later today.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This list looks right to me.

Thanks, @ojarjur. I believe this is good to go.

],
help="""List of kernel message types included in user activity tracking.

This should be a subset of the message types sent on the shell channel.""",
)

def start_watching_activity(self, kernel_id):
"""Start watching IOPub messages on a kernel for activity.
Expand All @@ -557,15 +577,27 @@ def start_watching_activity(self, kernel_id):

def record_activity(msg_list):
"""Record an IOPub message arriving from a kernel"""
self.last_kernel_activity = kernel.last_activity = utcnow()

idents, fed_msg_list = session.feed_identities(msg_list)
msg = session.deserialize(fed_msg_list, content=False)

msg_type = msg["header"]["msg_type"]
parent_msg_type = msg.get("parent_header", {}).get("msg_type", None)
if (
(msg_type in self.tracked_message_types)
or (parent_msg_type in self.tracked_message_types)
or kernel.execution_state == "busy"
):
self.last_kernel_activity = kernel.last_activity = utcnow()
if msg_type == "status":
msg = session.deserialize(fed_msg_list)
kernel.execution_state = msg["content"]["execution_state"]
execution_state = msg["content"]["execution_state"]
if parent_msg_type in self.tracked_message_types:
kernel.execution_state = execution_state
elif kernel.execution_state == "starting" and execution_state != "starting":
# We always normalize post-starting execution state to "idle"
# unless we know that the status is in response to one of our
# tracked message types.
kernel.execution_state = "idle"
self.log.debug(
"activity on %s: %s (%s)",
kernel_id,
Expand Down
79 changes: 79 additions & 0 deletions tests/services/kernels/test_cull.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import asyncio
import datetime
import json
import os
import platform
import uuid
import warnings

import jupyter_client
Expand Down Expand Up @@ -94,6 +96,83 @@ async def test_cull_idle(jp_fetch, jp_ws_fetch):
assert culled


@pytest.mark.parametrize(
"jp_server_config",
[
# Test the synchronous case
Config(
{
"ServerApp": {
"kernel_manager_class": "jupyter_server.services.kernels.kernelmanager.MappingKernelManager",
"MappingKernelManager": {
"cull_idle_timeout": CULL_TIMEOUT,
"cull_interval": CULL_INTERVAL,
"cull_connected": True,
},
}
}
),
# Test the async case
Config(
{
"ServerApp": {
"kernel_manager_class": "jupyter_server.services.kernels.kernelmanager.AsyncMappingKernelManager",
"AsyncMappingKernelManager": {
"cull_idle_timeout": CULL_TIMEOUT,
"cull_interval": CULL_INTERVAL,
"cull_connected": True,
},
}
}
),
],
)
async def test_cull_connected(jp_fetch, jp_ws_fetch):
r = await jp_fetch("api", "kernels", method="POST", allow_nonstandard_methods=True)
kernel = json.loads(r.body.decode())
kid = kernel["id"]

# Open a websocket connection.
ws = await jp_ws_fetch("api", "kernels", kid, "channels")
session_id = uuid.uuid1().hex
message_id = uuid.uuid1().hex
await ws.write_message(
json.dumps(
{
"channel": "shell",
"header": {
"date": datetime.datetime.now(tz=datetime.timezone.utc).isoformat(),
"session": session_id,
"msg_id": message_id,
"msg_type": "execute_request",
"username": "",
"version": "5.2",
},
"parent_header": {},
"metadata": {},
"content": {
"code": f"import time\ntime.sleep({CULL_TIMEOUT-1})",
"silent": False,
"allow_stdin": False,
"stop_on_error": True,
},
"buffers": [],
}
)
)

r = await jp_fetch("api", "kernels", kid, method="GET")
model = json.loads(r.body.decode())
assert model["connections"] == 1
culled = await get_cull_status(
kid, jp_fetch
) # connected, but code cell still running. Should not be culled
assert not culled
culled = await get_cull_status(kid, jp_fetch) # still connected, but idle... should be culled
assert culled
ws.close()


async def test_cull_idle_disable(jp_fetch, jp_ws_fetch, jp_kernelspec_with_metadata):
r = await jp_fetch("api", "kernels", method="POST", allow_nonstandard_methods=True)
kernel = json.loads(r.body.decode())
Expand Down
122 changes: 122 additions & 0 deletions tests/services/kernels/test_execution_state.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
import asyncio
import datetime
import json
import os
import platform
import time
import uuid
import warnings

import jupyter_client
import pytest
from flaky import flaky
from tornado.httpclient import HTTPClientError
from traitlets.config import Config


@flaky
async def test_execution_state(jp_fetch, jp_ws_fetch):
r = await jp_fetch("api", "kernels", method="POST", allow_nonstandard_methods=True)
kernel = json.loads(r.body.decode())
kid = kernel["id"]

# Open a websocket connection.
ws = await jp_ws_fetch("api", "kernels", kid, "channels")
session_id = uuid.uuid1().hex
message_id = uuid.uuid1().hex
await ws.write_message(
json.dumps(
{
"channel": "shell",
"header": {
"date": datetime.datetime.now(tz=datetime.timezone.utc).isoformat(),
"session": session_id,
"msg_id": message_id,
"msg_type": "execute_request",
"username": "",
"version": "5.2",
},
"parent_header": {},
"metadata": {},
"content": {
"code": "while True:\n\tpass",
"silent": False,
"allow_stdin": False,
"stop_on_error": True,
},
"buffers": [],
}
)
)
await poll_for_parent_message_status(kid, message_id, "busy", ws)
es = await get_execution_state(kid, jp_fetch)
assert es == "busy"

message_id_2 = uuid.uuid1().hex
await ws.write_message(
json.dumps(
{
"channel": "control",
"header": {
"date": datetime.datetime.now(tz=datetime.timezone.utc).isoformat(),
"session": session_id,
"msg_id": message_id_2,
"msg_type": "debug_request",
"username": "",
"version": "5.2",
},
"parent_header": {},
"metadata": {},
"content": {
"type": "request",
"command": "debugInfo",
},
"buffers": [],
}
)
)
await poll_for_parent_message_status(kid, message_id_2, "idle", ws)
es = await get_execution_state(kid, jp_fetch)

# Verify that the overall kernel status is still "busy" even though one
# "idle" response was already seen for the second execute request.
assert es == "busy"

await jp_fetch(
"api",
"kernels",
kid,
"interrupt",
method="POST",
allow_nonstandard_methods=True,
)

await poll_for_parent_message_status(kid, message_id, "idle", ws)
es = await get_execution_state(kid, jp_fetch)
assert es == "idle"
ws.close()


async def get_execution_state(kid, jp_fetch):
r = await jp_fetch("api", "kernels", kid, method="GET")
model = json.loads(r.body.decode())
execution_state = model["execution_state"]
return execution_state


async def poll_for_parent_message_status(kid, parent_message_id, target_status, ws):
while True:
resp = await ws.read_message()
resp_json = json.loads(resp)
print(resp_json)
parent_message = resp_json.get("parent_header", {}).get("msg_id", None)
if parent_message != parent_message_id:
continue

response_type = resp_json.get("header", {}).get("msg_type", None)
if response_type != "status":
continue

execution_state = resp_json.get("content", {}).get("execution_state", "")
if execution_state == target_status:
return
Loading