diff --git a/jupyter_server/serverapp.py b/jupyter_server/serverapp.py index ff193dba7e..1feafaee6d 100644 --- a/jupyter_server/serverapp.py +++ b/jupyter_server/serverapp.py @@ -147,6 +147,11 @@ except ImportError: terminado_available = False + class TerminalManager: + # Compatible with terminal_manager_class configuration + ... + + # ----------------------------------------------------------------------------- # Module globals # ----------------------------------------------------------------------------- @@ -178,6 +183,7 @@ # Added for backwards compatibility from classic notebook server. DEFAULT_SERVER_PORT = DEFAULT_JUPYTER_SERVER_PORT + # ----------------------------------------------------------------------------- # Helper functions # ----------------------------------------------------------------------------- @@ -351,6 +357,7 @@ def init_settings( server_root_dir=root_dir, jinja2_env=env, terminals_available=terminado_available and jupyter_app.terminals_enabled, + terminal_manager_class=jupyter_app.terminal_manager_class, serverapp=jupyter_app, ) @@ -525,7 +532,6 @@ def shutdown_server(server_info, timeout=5, log=None): class JupyterServerStopApp(JupyterApp): - version = __version__ description = "Stop currently running Jupyter server for a given port" @@ -665,7 +671,6 @@ def start(self): """, ) - # Add notebook manager flags flags.update( boolean_flag( @@ -694,13 +699,13 @@ def start(self): } ) + # ----------------------------------------------------------------------------- # ServerApp # ----------------------------------------------------------------------------- class ServerApp(JupyterApp): - name = "jupyter-server" version = __version__ description = _i18n( @@ -1671,6 +1676,19 @@ def _update_server_extensions(self, change): ), ) + terminal_manager_class = Type( + default_value=TerminalManager, + klass=TerminalManager if terminado_available else object, + config=True, + help=_i18n( + """The terminal manager class to use. + + Only when terminals_enabled is instantiated, + the call to init_terminals function will get self.terminal_manager + """ + ), + ) + # Since use of terminals is also a function of whether the terminado package is # available, this variable holds the "final indication" of whether terminal functionality # should be considered (particularly during shutdown/cleanup). It is enabled only diff --git a/jupyter_server/terminal/__init__.py b/jupyter_server/terminal/__init__.py index 8bac278403..e4e4561d70 100644 --- a/jupyter_server/terminal/__init__.py +++ b/jupyter_server/terminal/__init__.py @@ -12,7 +12,7 @@ from jupyter_server.utils import url_path_join as ujoin from . import api_handlers from .handlers import TermSocket -from .terminalmanager import TerminalManager +from .terminalmanager import TerminalManager, StatefulTerminalManager def initialize(webapp, root_dir, connection_url, settings): @@ -29,7 +29,9 @@ def initialize(webapp, root_dir, connection_url, settings): # the user has specifically set a preferred shell command. if os.name != "nt" and shell_override is None and not sys.stdout.isatty(): shell.append("-l") - terminal_manager = webapp.settings["terminal_manager"] = TerminalManager( + terminal_manager = webapp.settings["terminal_manager"] = webapp.settings[ + "terminal_manager_class" + ]( shell_command=shell, extra_env={ "JUPYTER_SERVER_ROOT": root_dir, diff --git a/jupyter_server/terminal/handlers.py b/jupyter_server/terminal/handlers.py index e56c780dcb..fcfc76d030 100644 --- a/jupyter_server/terminal/handlers.py +++ b/jupyter_server/terminal/handlers.py @@ -27,6 +27,7 @@ def get(self, *args, **kwargs): def on_message(self, message): super(TermSocket, self).on_message(message) self._update_activity() + self._set_state_busy_if_stateful() def write_message(self, message, binary=False): super(TermSocket, self).write_message(message, binary=binary) @@ -37,3 +38,9 @@ def _update_activity(self): # terminal may not be around on deletion/cull if self.term_name in self.terminal_manager.terminals: self.terminal_manager.terminals[self.term_name].last_activity = utcnow() + + def _set_state_busy_if_stateful(self): + if not hasattr(self.terminal_manager, "set_state_idle_if_return"): + return + if self.term_name in self.terminal_manager.terminals: + self.terminal_manager.terminals[self.term_name].execution_state = "busy" diff --git a/jupyter_server/terminal/terminalmanager.py b/jupyter_server/terminal/terminalmanager.py index cfbfea8e4c..cee732c457 100644 --- a/jupyter_server/terminal/terminalmanager.py +++ b/jupyter_server/terminal/terminalmanager.py @@ -7,6 +7,7 @@ from datetime import timedelta import terminado +from terminado.management import _poll from tornado import web from tornado.ioloop import IOLoop from tornado.ioloop import PeriodicCallback @@ -102,7 +103,7 @@ def get_terminal_model(self, name): def _check_terminal(self, name): """Check a that terminal 'name' exists and raise 404 if not.""" if name not in self.terminals: - raise web.HTTPError(404, u"Terminal not found: %s" % name) + raise web.HTTPError(404, "Terminal not found: %s" % name) def _initialize_culler(self): """Start culler if 'cull_inactive_timeout' is greater than zero. @@ -165,3 +166,62 @@ async def _cull_inactive_terminal(self, name): "Culling terminal '%s' due to %s seconds of inactivity.", name, inactivity ) await self.terminate(name, force=True) + + +class StatefulTerminalManager(TerminalManager): + """ + ***Experimental*** + Patch execution_state into terminal + The method of setting the state is not necessarily reliable, the terminal of bus y state will still be culled + """ + + def pty_read(self, fd, events=None): + """Called by the event loop when there is pty data ready to read.""" + # prevent blocking on fd + if not _poll(fd, timeout=0.1): # 100ms + self.log.debug(f"Spurious pty_read() on fd {fd}") + return + ptywclients = self.ptys_by_fd[fd] + try: + s = ptywclients.ptyproc.read(65536) + client_list = ptywclients.clients + ptywclients.read_buffer.append(s) + # hook for set terminal status + self.set_state_idle_if_return(ptywclients, s) + if not client_list: + # No one to consume our output: buffer it. + ptywclients.preopen_buffer.append(s) + return + for client in ptywclients.clients: + client.on_pty_read(s) + except EOFError: + self.on_eof(ptywclients) + for client in ptywclients.clients: + client.on_pty_died() + + def set_state_idle_if_return(self, ptywclients, s): + first_stdout = getattr(ptywclients, "first_stdout", "") + + if not first_stdout: + # Record the first output to identify the terminal return + # It works well for jupyterhub-singleuser and should also work for other debian-based mirrors + # fixme: May fail if terminal is not properly separated with ':' or change user after connect + # (Any change to the user, hostname or environment may render it invalid) + first_stdout = s.split(":")[0].lstrip() + ptywclients.first_stdout = first_stdout + self.log.debug(f'take "{first_stdout}" as terminal returned') + if s.lstrip().startswith(first_stdout): + self._set_state_idle(ptywclients) + + def _set_state_idle(self, ptywclients): + self.log.debug("set terminal execution_state as idle") + ptywclients.execution_state = "idle" + + def get_terminal_model(self, name): + """Return a JSON-safe dict representing a terminal. + For use in representing terminals in the JSON APIs. + """ + model = super(StatefulTerminalManager, self).get_terminal_model(name) + term = self.terminals[name] + model.setdefault("execution_state", getattr(term, "execution_state", "not connected yet")) + return model diff --git a/jupyter_server/tests/test_stateful_terminal.py b/jupyter_server/tests/test_stateful_terminal.py new file mode 100644 index 0000000000..72549f504d --- /dev/null +++ b/jupyter_server/tests/test_stateful_terminal.py @@ -0,0 +1,119 @@ +import asyncio +import json +import os +import platform +import shutil + +import pytest +from traitlets.config import Config + + +@pytest.fixture +def terminal_path(tmp_path): + subdir = tmp_path.joinpath("terminal_path") + subdir.mkdir() + + yield subdir + + shutil.rmtree(str(subdir), ignore_errors=True) + + +CULL_TIMEOUT = 10 +CULL_INTERVAL = 3 + + +@pytest.fixture +def jp_server_config(): + return Config( + { + "ServerApp": { + "terminal_manager_class": "jupyter_server.terminal.StatefulTerminalManager", + } + } + ) + + +@pytest.mark.skipif(platform.system().lower() != "linux", reason="Only available on Linux") +async def test_set_idle(jp_fetch, jp_ws_fetch, jp_cleanup_subprocesses, jp_serverapp): + # disable man sudo_root + os.system(f"touch {os.path.expanduser('~/.sudo_as_admin_successful')}") + + resp = await jp_fetch( + "api", + "terminals", + method="POST", + allow_nonstandard_methods=True, + ) + term = json.loads(resp.body.decode()) + term_1 = term["name"] + ws = await jp_ws_fetch("terminals", "websocket", term_1) + setup = ["set_size", 0, 0, 80, 32] + await ws.write_message(json.dumps(setup)) + while True: + try: + await asyncio.wait_for(ws.read_message(), timeout=1) + except asyncio.TimeoutError: + break + sleep_2_msg = ["stdin", "python -c 'import time;time.sleep(2)'\r\n"] + await ws.write_message(json.dumps(sleep_2_msg)) + await asyncio.sleep(1) + assert ( + jp_serverapp.web_app.settings["terminal_manager"].terminals[term_1].execution_state + == "busy" + ) + await asyncio.sleep(2) + message_stdout = "" + while True: + try: + message = await asyncio.wait_for(ws.read_message(), timeout=5.0) + except asyncio.TimeoutError: + break + + message = json.loads(message) + + if message[0] == "stdout": + message_stdout += message[1] + print(message_stdout) + assert ( + jp_serverapp.web_app.settings["terminal_manager"].terminals[term_1].execution_state + == "idle" + ) + await jp_cleanup_subprocesses() + + +@pytest.mark.skipif(platform.system().lower() != "linux", reason="Only available on Linux") +async def test_set_idle_disconnect(jp_fetch, jp_ws_fetch, jp_cleanup_subprocesses, jp_serverapp): + # disable man sudo_root + os.system(f"touch {os.path.expanduser('~/.sudo_as_admin_successful')}") + + resp = await jp_fetch( + "api", + "terminals", + method="POST", + allow_nonstandard_methods=True, + ) + term = json.loads(resp.body.decode()) + term_1 = term["name"] + ws = await jp_ws_fetch("terminals", "websocket", term_1) + setup = ["set_size", 0, 0, 80, 32] + await ws.write_message(json.dumps(setup)) + while True: + try: + await asyncio.wait_for(ws.read_message(), timeout=1) + except asyncio.TimeoutError: + break + sleep_2_msg = ["stdin", "python -c 'import time;time.sleep(2)'\r\n"] + await ws.write_message(json.dumps(sleep_2_msg)) + ws.close() + await asyncio.sleep(1) + assert ( + jp_serverapp.web_app.settings["terminal_manager"].terminals[term_1].execution_state + == "busy" + ) + await asyncio.sleep(2) + assert not jp_serverapp.web_app.settings["terminal_manager"].terminals[term_1].clients + assert ( + jp_serverapp.web_app.settings["terminal_manager"].terminals[term_1].execution_state + == "idle" + ) + await jp_cleanup_subprocesses()