Skip to content

Commit

Permalink
update Jupyter tests (#51)
Browse files Browse the repository at this point in the history
* update Jupyter tests

* linting

* clean up test

* avoid mutating user input
  • Loading branch information
Matt Kafonek authored Oct 3, 2022
1 parent 71313c8 commit ec0e63c
Showing 1 changed file with 116 additions and 25 deletions.
141 changes: 116 additions & 25 deletions tests/test_jupyter_backend.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import asyncio
import os
import time
from typing import List

import pytest
from jupyter_client import manager
Expand All @@ -16,20 +17,80 @@ def ipykernel():
km.shutdown_kernel()


class JupyterMonitor:
"""
This can help progress an asyncio loop forward until certain messages have been seen
coming across the ZMQ socket, even across different channels.
"""

def __init__(self, mgr: JupyterKernelManager):
self.next_event = asyncio.Event()
self.last_seen_event: dict = None
mgr.register_callback(self.record_event)

async def record_event(self, event: dict):
self.last_seen_event = event
self.next_event.set()

async def run_until_seen(self, msg_types: List[str], timeout: float):
"""
await multiple `asyncio.wait_for`'s until we've observed all msg_types we expect
or an overall timeout has been reached. msg_types can contain dupes e.g. wait
to see 'status', 'execute_reply', and 'status' again.
"""
deadline = time.time() + timeout
msg_types = msg_types[:]
while msg_types:
max_wait = deadline - time.time()
await asyncio.wait_for(self.next_event.wait(), timeout=max_wait)
if self.last_seen_event["msg_type"] in msg_types:
msg_types.remove(self.last_seen_event["msg_type"])
self.next_event.clear()


class TestJupyterBackend:
async def test_jupyter_backend(self, mocker, ipykernel):
cb = mocker.MagicMock()
"""
Test basic execution in ipykernel. When we send an execute_request over to the Kernel,
we should observe a series of replies come over the shell and iopub channels.
- status (iopub) goes to busy
- execute_input (iopub) says the cell is running
- stream (iopub) has the cell output
- execute_reply (shell) says the cell is complete
- status (iopub) goes to idle
We can't guarentee order of messages received between channels -- execute_reply could
come before or after stream, and before or after the idle status.
"""
iopub_cb = mocker.MagicMock()
shell_cb = mocker.MagicMock()
mgr = JupyterKernelManager(ipykernel)
monitor = JupyterMonitor(mgr)
await mgr.initialize()
mgr.register_callback(cb, on_topic="iopub")

mgr.register_callback(iopub_cb, on_topic="iopub")
mgr.register_callback(shell_cb, on_topic="shell")

await mgr.subscribe_to_topic("iopub")
await mgr.subscribe_to_topic("shell")

# Execute a cell. Should expect 5 messages:
# - status (iopub), kernel going to busy
# - execute_input (iopub), cell is running
# - stream (iopub), cell output
# - execute_reply (shell), cell is complete
# - status (iopub), kernel going to idle
mgr.send("shell", "execute_request", {"code": "print('asdf')", "silent": False})
await asyncio.sleep(1)
await mgr._drain_queues()
await monitor.run_until_seen(
msg_types=["status", "execute_input", "execute_reply", "stream", "status"],
timeout=3,
)

# Quick sanity test for message ordering
iopub_msgs = cb.call_args_list
# the callbacks on iopub and shell channel should have gotten one argument from
# each message which is the jupyter message as a dict. Assert the order of
# received messages over iopub, but we can't guarentee order of received messages
# betwen shell / iopub channel (e.g. execute_reply could come before or after
# the kernel status goes back to idle)
iopub_msgs = iopub_cb.call_args_list
assert len(iopub_msgs) == 4
assert iopub_msgs[0].args[0]["msg_type"] == "status"
assert iopub_msgs[0].args[0]["content"]["execution_state"] == "busy"
Expand All @@ -38,39 +99,69 @@ async def test_jupyter_backend(self, mocker, ipykernel):
assert iopub_msgs[3].args[0]["msg_type"] == "status"
assert iopub_msgs[3].args[0]["content"]["execution_state"] == "idle"

cb.reset_mock()
shell_msgs = shell_cb.call_args_list
assert len(shell_msgs) == 1
assert shell_msgs[0].args[0]["msg_type"] == "execute_reply"

# If we reset the iopub callback list and unsubscribe from monitoring iopub,
# then when we execute another request we'll only see the messages on shell not
# anything on iopub
iopub_cb.reset_mock()
shell_cb.reset_mock()
await mgr.unsubscribe_from_topic("iopub")
mgr.send("shell", "execute_request", {"code": "print('asdf')", "silent": False})
await asyncio.sleep(1)
await monitor.run_until_seen(msg_types=["execute_reply"], timeout=3)
await mgr.shutdown()
cb.assert_not_called()
iopub_cb.assert_not_called()
shell_cb.assert_called_once()

@pytest.mark.xfail(reason="Cycling sockets is buggy in the current implementation")
async def test_reconnection(self, mocker, ipykernel):
iopub = mocker.MagicMock()
"""
Test that if a message over the zmq channel is too large, we won't receive it
but we will still be able to send further execute requests without issue.
"""
disconnect_event = mocker.MagicMock()

mgr = JupyterKernelManager(ipykernel, max_message_size=5 * 1024)
monitor = JupyterMonitor(mgr)

await mgr.initialize()
mgr.register_callback(iopub, on_topic="iopub")

mgr.register_callback(disconnect_event, on_system_event=SystemEvents.FORCED_DISCONNECT)
await mgr.subscribe_to_topic("iopub")
await mgr.subscribe_to_topic("shell")

mgr.send("shell", "execute_request", {"code": "print('asdf')", "silent": False})
await asyncio.sleep(1)
await mgr._drain_queues()
iopub.assert_called()
# Execute a cell and run until we've seen the normal expected messages.
mgr.send("shell", "execute_request", {"code": "print('foo')", "silent": False})
await monitor.run_until_seen(
msg_types=["status", "execute_input", "stream", "execute_reply", "status"],
timeout=3,
)

iopub.reset_mock()
# Execute a cell with output larger than our max message size. We should observe
# status going to busy, execute_input, then a disconnect event where we would normally
# see a stream. The iopub channel should cycle, and hopefully catch the status going
# idle. We'll also see execute_reply on shell channel.
mgr.send(
"shell", "execute_request", {"code": f"print('{os.urandom(2048)}')", "silent": False}
"shell",
"execute_request",
{"code": "print('x' * 2**13)", "silent": False},
)
await asyncio.sleep(1)
await mgr._drain_queues()
iopub.assert_called()
try:
await monitor.run_until_seen(
msg_types=["status", "execute_input", "execute_reply", "status"], timeout=3
)
except asyncio.TimeoutError:
await mgr.shutdown()
raise Exception("Did not see the expected messages after cycling the iopub channel")

disconnect_event.assert_called()

iopub.reset_mock()
mgr.send("shell", "execute_request", {"code": "print('asdf')", "silent": False})
await asyncio.sleep(3)
iopub.assert_called()
# Prove that after cycling the socket, normal executions work the same as always
mgr.send("shell", "execute_request", {"code": "print('baz')", "silent": False})
await monitor.run_until_seen(
msg_types=["status", "execute_input", "stream", "execute_reply", "status"],
timeout=3,
)
await mgr.shutdown()

0 comments on commit ec0e63c

Please sign in to comment.