From 2ddbcf212eea14d62efecf734729fa9716573707 Mon Sep 17 00:00:00 2001 From: David Brochart Date: Sun, 2 Feb 2020 12:25:01 +0100 Subject: [PATCH 1/6] Add async API --- jupyter_client/__init__.py | 1 + jupyter_client/asynchronous/__init__.py | 1 + jupyter_client/asynchronous/channels.py | 295 ++++++++++++++++++ jupyter_client/asynchronous/client.py | 392 ++++++++++++++++++++++++ jupyter_client/channels.py | 7 +- 5 files changed, 695 insertions(+), 1 deletion(-) create mode 100644 jupyter_client/asynchronous/__init__.py create mode 100644 jupyter_client/asynchronous/channels.py create mode 100644 jupyter_client/asynchronous/client.py diff --git a/jupyter_client/__init__.py b/jupyter_client/__init__.py index ead3a16f3..f680b3d6a 100644 --- a/jupyter_client/__init__.py +++ b/jupyter_client/__init__.py @@ -6,4 +6,5 @@ from .client import KernelClient from .manager import KernelManager, run_kernel from .blocking import BlockingKernelClient +from .asynchronous import AsyncKernelClient from .multikernelmanager import MultiKernelManager diff --git a/jupyter_client/asynchronous/__init__.py b/jupyter_client/asynchronous/__init__.py new file mode 100644 index 000000000..19a739f05 --- /dev/null +++ b/jupyter_client/asynchronous/__init__.py @@ -0,0 +1 @@ +from .client import AsyncKernelClient diff --git a/jupyter_client/asynchronous/channels.py b/jupyter_client/asynchronous/channels.py new file mode 100644 index 000000000..5d759cd52 --- /dev/null +++ b/jupyter_client/asynchronous/channels.py @@ -0,0 +1,295 @@ +"""Base classes to manage a Client's interaction with a running kernel""" + +# Copyright (c) Jupyter Development Team. +# Distributed under the terms of the Modified BSD License. + +from __future__ import absolute_import + +import atexit +import errno +from threading import Thread, Event +import time +import asyncio + +import zmq +# import ZMQError in top-level namespace, to avoid ugly attribute-error messages +# during garbage collection of threads at exit: +from zmq import ZMQError + +from jupyter_client import protocol_version_info + +from ..channelsabc import HBChannelABC + +try: + from queue import Queue, Empty # Py 3 +except ImportError: + from Queue import Queue, Empty # Py 2 + + +class ZMQSocketChannel(object): + """A ZMQ socket in a simple async API""" + session = None + socket = None + stream = None + _exiting = False + proxy_methods = [] + + def __init__(self, socket, session, loop=None): + """Create a channel. + + Parameters + ---------- + socket : :class:`zmq.asyncio.Socket` + The ZMQ socket to use. + session : :class:`session.Session` + The session to use. + loop + Unused here, for other implementations + """ + super(ZMQSocketChannel, self).__init__() + + self.socket = socket + self.session = session + + async def _recv(self, **kwargs): + msg = await self.socket.recv_multipart(**kwargs) + ident,smsg = self.session.feed_identities(msg) + return self.session.deserialize(smsg) + + async def get_msg(self, timeout=None): + """ Gets a message if there is one that is ready. """ + if timeout is not None: + timeout *= 1000 # seconds to ms + ready = await self.socket.poll(timeout) + + if ready: + return await self._recv() + else: + raise Empty + + async def get_msgs(self): + """ Get all messages that are currently ready. """ + msgs = [] + while True: + try: + msgs.append(await self.get_msg()) + except Empty: + break + return msgs + + async def msg_ready(self): + """ Is there a message that has been received? """ + return bool(await self.socket.poll(timeout=0)) + + def close(self): + if self.socket is not None: + try: + self.socket.close(linger=0) + except Exception: + pass + self.socket = None + stop = close + + def is_alive(self): + return (self.socket is not None) + + def send(self, msg): + """Pass a message to the ZMQ socket to send + """ + self.session.send(self.socket, msg) + + def start(self): + pass +#----------------------------------------------------------------------------- +# Constants and exceptions +#----------------------------------------------------------------------------- + +major_protocol_version = protocol_version_info[0] + +class InvalidPortNumber(Exception): + pass + +class HBChannel(Thread): + """The heartbeat channel which monitors the kernel heartbeat. + + Note that the heartbeat channel is paused by default. As long as you start + this channel, the kernel manager will ensure that it is paused and un-paused + as appropriate. + """ + context = None + session = None + socket = None + address = None + _exiting = False + + time_to_dead = 1. + poller = None + _running = None + _pause = None + _beating = None + + def __init__(self, context=None, session=None, address=None, loop=None): + """Create the heartbeat monitor thread. + + Parameters + ---------- + context : :class:`zmq.Context` + The ZMQ context to use. + session : :class:`session.Session` + The session to use. + address : zmq url + Standard (ip, port) tuple that the kernel is listening on. + """ + super(HBChannel, self).__init__() + self.daemon = True + + self.loop = loop + + self.context = context + self.session = session + if isinstance(address, tuple): + if address[1] == 0: + message = 'The port number for a channel cannot be 0.' + raise InvalidPortNumber(message) + address = "tcp://%s:%i" % address + self.address = address + + # running is False until `.start()` is called + self._running = False + self._exit = Event() + # don't start paused + self._pause = False + self.poller = zmq.Poller() + + @staticmethod + @atexit.register + def _notice_exit(): + # Class definitions can be torn down during interpreter shutdown. + # We only need to set _exiting flag if this hasn't happened. + if HBChannel is not None: + HBChannel._exiting = True + + def _create_socket(self): + if self.socket is not None: + # close previous socket, before opening a new one + self.poller.unregister(self.socket) + self.socket.close() + self.socket = self.context.socket(zmq.REQ) + self.socket.linger = 1000 + self.socket.connect(self.address) + + self.poller.register(self.socket, zmq.POLLIN) + + def _poll(self, start_time): + """poll for heartbeat replies until we reach self.time_to_dead. + + Ignores interrupts, and returns the result of poll(), which + will be an empty list if no messages arrived before the timeout, + or the event tuple if there is a message to receive. + """ + + until_dead = self.time_to_dead - (time.time() - start_time) + # ensure poll at least once + until_dead = max(until_dead, 1e-3) + events = [] + while True: + try: + events = self.poller.poll(1000 * until_dead) + except ZMQError as e: + if e.errno == errno.EINTR: + # ignore interrupts during heartbeat + # this may never actually happen + until_dead = self.time_to_dead - (time.time() - start_time) + until_dead = max(until_dead, 1e-3) + pass + else: + raise + except Exception: + if self._exiting: + break + else: + raise + else: + break + return events + + def run(self): + """The thread's main activity. Call start() instead.""" + if self.loop is not None: + asyncio.set_event_loop(self.loop) + self._create_socket() + self._running = True + self._beating = True + + while self._running: + if self._pause: + # just sleep, and skip the rest of the loop + self._exit.wait(self.time_to_dead) + continue + + since_last_heartbeat = 0.0 + # no need to catch EFSM here, because the previous event was + # either a recv or connect, which cannot be followed by EFSM + self.socket.send(b'ping') + request_time = time.time() + ready = self._poll(request_time) + if ready: + self._beating = True + # the poll above guarantees we have something to recv + self.socket.recv() + # sleep the remainder of the cycle + remainder = self.time_to_dead - (time.time() - request_time) + if remainder > 0: + self._exit.wait(remainder) + continue + else: + # nothing was received within the time limit, signal heart failure + self._beating = False + since_last_heartbeat = time.time() - request_time + self.call_handlers(since_last_heartbeat) + # and close/reopen the socket, because the REQ/REP cycle has been broken + self._create_socket() + continue + + def pause(self): + """Pause the heartbeat.""" + self._pause = True + + def unpause(self): + """Unpause the heartbeat.""" + self._pause = False + + def is_beating(self): + """Is the heartbeat running and responsive (and not paused).""" + if self.is_alive() and not self._pause and self._beating: + return True + else: + return False + + def stop(self): + """Stop the channel's event loop and join its thread.""" + self._running = False + self._exit.set() + self.join() + self.close() + + def close(self): + if self.socket is not None: + try: + self.socket.close(linger=0) + except Exception: + pass + self.socket = None + + def call_handlers(self, since_last_heartbeat): + """This method is called in the ioloop thread when a message arrives. + + Subclasses should override this method to handle incoming messages. + It is important to remember that this method is called in the thread + so that some logic must be done to ensure that the application level + handlers are called in the application thread. + """ + pass + + +HBChannelABC.register(HBChannel) diff --git a/jupyter_client/asynchronous/client.py b/jupyter_client/asynchronous/client.py new file mode 100644 index 000000000..c523478e7 --- /dev/null +++ b/jupyter_client/asynchronous/client.py @@ -0,0 +1,392 @@ +"""Implements an async kernel client. +""" +# Copyright (c) Jupyter Development Team. +# Distributed under the terms of the Modified BSD License. + +from __future__ import print_function + +from functools import partial +from getpass import getpass +try: + from queue import Empty # Python 3 +except ImportError: + from Queue import Empty # Python 2 +import sys +import time + +import zmq +import zmq.asyncio +import asyncio + +from traitlets import (Type, Instance) +from jupyter_client.channels import HBChannel +from jupyter_client.client import KernelClient +from .channels import ZMQSocketChannel + +try: + monotonic = time.monotonic +except AttributeError: + # py2 + monotonic = time.time # close enough + +try: + TimeoutError +except NameError: + # py2 + TimeoutError = RuntimeError + + +def reqrep(meth, channel='shell'): + def wrapped(self, *args, **kwargs): + reply = kwargs.pop('reply', False) + timeout = kwargs.pop('timeout', None) + msg_id = meth(self, *args, **kwargs) + if not reply: + return msg_id + + return self._recv_reply(msg_id, timeout=timeout, channel=channel) + + if not meth.__doc__: + # python -OO removes docstrings, + # so don't bother building the wrapped docstring + return wrapped + + basedoc, _ = meth.__doc__.split('Returns\n', 1) + parts = [basedoc.strip()] + if 'Parameters' not in basedoc: + parts.append(""" + Parameters + ---------- + """) + parts.append(""" + reply: bool (default: False) + Whether to wait for and return reply + timeout: float or None (default: None) + Timeout to use when waiting for a reply + + Returns + ------- + msg_id: str + The msg_id of the request sent, if reply=False (default) + reply: dict + The reply message for this request, if reply=True + """) + wrapped.__doc__ = '\n'.join(parts) + return wrapped + +class AsyncKernelClient(KernelClient): + """A KernelClient with async APIs + + ``get_[channel]_msg()`` methods wait for and return messages on channels, + raising :exc:`queue.Empty` if no message arrives within ``timeout`` seconds. + """ + + # The PyZMQ Context to use for communication with the kernel. + context = Instance(zmq.asyncio.Context) + def _context_default(self): + return zmq.asyncio.Context() + + #-------------------------------------------------------------------------- + # Channel proxy methods + #-------------------------------------------------------------------------- + + async def get_shell_msg(self, *args, **kwargs): + """Get a message from the shell channel""" + return await self.shell_channel.get_msg(*args, **kwargs) + + async def get_iopub_msg(self, *args, **kwargs): + """Get a message from the iopub channel""" + return await self.iopub_channel.get_msg(*args, **kwargs) + + async def get_stdin_msg(self, *args, **kwargs): + """Get a message from the stdin channel""" + return await self.stdin_channel.get_msg(*args, **kwargs) + + async def get_control_msg(self, *args, **kwargs): + """Get a message from the control channel""" + return await self.control_channel.get_msg(*args, **kwargs) + + @property + def hb_channel(self): + """Get the hb channel object for this kernel.""" + if self._hb_channel is None: + url = self._make_url('hb') + self.log.debug("connecting heartbeat channel to %s", url) + loop = asyncio.new_event_loop() + self._hb_channel = self.hb_channel_class( + self.context, self.session, url, loop + ) + return self._hb_channel + + async def wait_for_ready(self, timeout=None): + """Waits for a response when a client is blocked + + - Sets future time for timeout + - Blocks on shell channel until a message is received + - Exit if the kernel has died + - If client times out before receiving a message from the kernel, send RuntimeError + - Flush the IOPub channel + """ + if timeout is None: + abs_timeout = float('inf') + else: + abs_timeout = time.time() + timeout + + from ..manager import KernelManager + if not isinstance(self.parent, KernelManager): + # This Client was not created by a KernelManager, + # so wait for kernel to become responsive to heartbeats + # before checking for kernel_info reply + while not self.is_alive(): + if time.time() > abs_timeout: + raise RuntimeError("Kernel didn't respond to heartbeats in %d seconds and timed out" % timeout) + time.sleep(0.2) + + # Wait for kernel info reply on shell channel + while True: + try: + msg = await self.shell_channel.get_msg(timeout=1) + except Empty: + pass + else: + if msg['msg_type'] == 'kernel_info_reply': + self._handle_kernel_info_reply(msg) + break + + if not self.is_alive(): + raise RuntimeError('Kernel died before replying to kernel_info') + + # Check if current time is ready check time plus timeout + if time.time() > abs_timeout: + raise RuntimeError("Kernel didn't respond in %d seconds" % timeout) + + # Flush IOPub channel + while True: + try: + msg = await self.iopub_channel.get_msg(timeout=0.2) + except Empty: + break + + # The classes to use for the various channels + shell_channel_class = Type(ZMQSocketChannel) + iopub_channel_class = Type(ZMQSocketChannel) + stdin_channel_class = Type(ZMQSocketChannel) + hb_channel_class = Type(HBChannel) + control_channel_class = Type(ZMQSocketChannel) + + + async def _recv_reply(self, msg_id, timeout=None, channel='shell'): + """Receive and return the reply for a given request""" + if timeout is not None: + deadline = monotonic() + timeout + while True: + if timeout is not None: + timeout = max(0, deadline - monotonic()) + try: + if channel == 'control': + reply = await self.get_control_msg(timeout=timeout) + else: + reply = await self.get_shell_msg(timeout=timeout) + except Empty: + raise TimeoutError("Timeout waiting for reply") + if reply['parent_header'].get('msg_id') != msg_id: + # not my reply, someone may have forgotten to retrieve theirs + continue + return reply + + + # replies come on the shell channel + execute = reqrep(KernelClient.execute) + history = reqrep(KernelClient.history) + complete = reqrep(KernelClient.complete) + inspect = reqrep(KernelClient.inspect) + kernel_info = reqrep(KernelClient.kernel_info) + comm_info = reqrep(KernelClient.comm_info) + + # replies come on the control channel + shutdown = reqrep(KernelClient.shutdown, channel='control') + + + def _stdin_hook_default(self, msg): + """Handle an input request""" + content = msg['content'] + if content.get('password', False): + prompt = getpass + elif sys.version_info < (3,): + prompt = raw_input + else: + prompt = input + + try: + raw_data = prompt(content["prompt"]) + except EOFError: + # turn EOFError into EOF character + raw_data = '\x04' + except KeyboardInterrupt: + sys.stdout.write('\n') + return + + # only send stdin reply if there *was not* another request + # or execution finished while we were reading. + if not (self.stdin_channel.msg_ready() or self.shell_channel.msg_ready()): + self.input(raw_data) + + def _output_hook_default(self, msg): + """Default hook for redisplaying plain-text output""" + msg_type = msg['header']['msg_type'] + content = msg['content'] + if msg_type == 'stream': + stream = getattr(sys, content['name']) + stream.write(content['text']) + elif msg_type in ('display_data', 'execute_result'): + sys.stdout.write(content['data'].get('text/plain', '')) + elif msg_type == 'error': + print('\n'.join(content['traceback']), file=sys.stderr) + + def _output_hook_kernel(self, session, socket, parent_header, msg): + """Output hook when running inside an IPython kernel + + adds rich output support. + """ + msg_type = msg['header']['msg_type'] + if msg_type in ('display_data', 'execute_result', 'error'): + session.send(socket, msg_type, msg['content'], parent=parent_header) + else: + self._output_hook_default(msg) + + async def execute_interactive(self, code, silent=False, store_history=True, + user_expressions=None, allow_stdin=None, stop_on_error=True, + timeout=None, output_hook=None, stdin_hook=None, + ): + """Execute code in the kernel interactively + + Output will be redisplayed, and stdin prompts will be relayed as well. + If an IPython kernel is detected, rich output will be displayed. + + You can pass a custom output_hook callable that will be called + with every IOPub message that is produced instead of the default redisplay. + + .. versionadded:: 5.0 + + Parameters + ---------- + code : str + A string of code in the kernel's language. + + silent : bool, optional (default False) + If set, the kernel will execute the code as quietly possible, and + will force store_history to be False. + + store_history : bool, optional (default True) + If set, the kernel will store command history. This is forced + to be False if silent is True. + + user_expressions : dict, optional + A dict mapping names to expressions to be evaluated in the user's + dict. The expression values are returned as strings formatted using + :func:`repr`. + + allow_stdin : bool, optional (default self.allow_stdin) + Flag for whether the kernel can send stdin requests to frontends. + + Some frontends (e.g. the Notebook) do not support stdin requests. + If raw_input is called from code executed from such a frontend, a + StdinNotImplementedError will be raised. + + stop_on_error: bool, optional (default True) + Flag whether to abort the execution queue, if an exception is encountered. + + timeout: float or None (default: None) + Timeout to use when waiting for a reply + + output_hook: callable(msg) + Function to be called with output messages. + If not specified, output will be redisplayed. + + stdin_hook: callable(msg) + Function to be called with stdin_request messages. + If not specified, input/getpass will be called. + + Returns + ------- + reply: dict + The reply message for this request + """ + if not self.iopub_channel.is_alive(): + raise RuntimeError("IOPub channel must be running to receive output") + if allow_stdin is None: + allow_stdin = self.allow_stdin + if allow_stdin and not self.stdin_channel.is_alive(): + raise RuntimeError("stdin channel must be running to allow input") + msg_id = await self.execute(code, + silent=silent, + store_history=store_history, + user_expressions=user_expressions, + allow_stdin=allow_stdin, + stop_on_error=stop_on_error, + ) + if stdin_hook is None: + stdin_hook = self._stdin_hook_default + if output_hook is None: + # detect IPython kernel + if 'IPython' in sys.modules: + from IPython import get_ipython + ip = get_ipython() + in_kernel = getattr(ip, 'kernel', False) + if in_kernel: + output_hook = partial( + self._output_hook_kernel, + ip.display_pub.session, + ip.display_pub.pub_socket, + ip.display_pub.parent_header, + ) + if output_hook is None: + # default: redisplay plain-text outputs + output_hook = self._output_hook_default + + # set deadline based on timeout + if timeout is not None: + deadline = monotonic() + timeout + else: + timeout_ms = None + + poller = zmq.Poller() + iopub_socket = self.iopub_channel.socket + poller.register(iopub_socket, zmq.POLLIN) + if allow_stdin: + stdin_socket = self.stdin_channel.socket + poller.register(stdin_socket, zmq.POLLIN) + else: + stdin_socket = None + + # wait for output and redisplay it + while True: + if timeout is not None: + timeout = max(0, deadline - monotonic()) + timeout_ms = 1e3 * timeout + events = dict(poller.poll(timeout_ms)) + if not events: + raise TimeoutError("Timeout waiting for output") + if stdin_socket in events: + req = await self.stdin_channel.get_msg(timeout=0) + stdin_hook(req) + continue + if iopub_socket not in events: + continue + + msg = await self.iopub_channel.get_msg(timeout=0) + + if msg['parent_header'].get('msg_id') != msg_id: + # not from my request + continue + output_hook(msg) + + # stop on idle + if msg['header']['msg_type'] == 'status' and \ + msg['content']['execution_state'] == 'idle': + break + + # output is done, get the reply + if timeout is not None: + timeout = max(0, deadline - monotonic()) + return await self._recv_reply(msg_id, timeout=timeout) diff --git a/jupyter_client/channels.py b/jupyter_client/channels.py index 7892de47e..fca30fc83 100644 --- a/jupyter_client/channels.py +++ b/jupyter_client/channels.py @@ -9,6 +9,7 @@ import errno from threading import Thread, Event import time +import asyncio import zmq # import ZMQError in top-level namespace, to avoid ugly attribute-error messages @@ -47,7 +48,7 @@ class HBChannel(Thread): _pause = None _beating = None - def __init__(self, context=None, session=None, address=None): + def __init__(self, context=None, session=None, address=None, loop=None): """Create the heartbeat monitor thread. Parameters @@ -62,6 +63,8 @@ def __init__(self, context=None, session=None, address=None): super(HBChannel, self).__init__() self.daemon = True + self.loop = loop + self.context = context self.session = session if isinstance(address, tuple): @@ -132,6 +135,8 @@ def _poll(self, start_time): def run(self): """The thread's main activity. Call start() instead.""" + if self.loop is not None: + asyncio.set_event_loop(self.loop) self._create_socket() self._running = True self._beating = True From 9376af308fc3338cb38c001d03496fd74d863dd2 Mon Sep 17 00:00:00 2001 From: David Brochart Date: Sun, 2 Feb 2020 12:40:51 +0100 Subject: [PATCH 2/6] Remove leftover --- jupyter_client/asynchronous/channels.py | 210 ------------------------ 1 file changed, 210 deletions(-) diff --git a/jupyter_client/asynchronous/channels.py b/jupyter_client/asynchronous/channels.py index 5d759cd52..bd973b13a 100644 --- a/jupyter_client/asynchronous/channels.py +++ b/jupyter_client/asynchronous/channels.py @@ -3,23 +3,6 @@ # Copyright (c) Jupyter Development Team. # Distributed under the terms of the Modified BSD License. -from __future__ import absolute_import - -import atexit -import errno -from threading import Thread, Event -import time -import asyncio - -import zmq -# import ZMQError in top-level namespace, to avoid ugly attribute-error messages -# during garbage collection of threads at exit: -from zmq import ZMQError - -from jupyter_client import protocol_version_info - -from ..channelsabc import HBChannelABC - try: from queue import Queue, Empty # Py 3 except ImportError: @@ -100,196 +83,3 @@ def send(self, msg): def start(self): pass -#----------------------------------------------------------------------------- -# Constants and exceptions -#----------------------------------------------------------------------------- - -major_protocol_version = protocol_version_info[0] - -class InvalidPortNumber(Exception): - pass - -class HBChannel(Thread): - """The heartbeat channel which monitors the kernel heartbeat. - - Note that the heartbeat channel is paused by default. As long as you start - this channel, the kernel manager will ensure that it is paused and un-paused - as appropriate. - """ - context = None - session = None - socket = None - address = None - _exiting = False - - time_to_dead = 1. - poller = None - _running = None - _pause = None - _beating = None - - def __init__(self, context=None, session=None, address=None, loop=None): - """Create the heartbeat monitor thread. - - Parameters - ---------- - context : :class:`zmq.Context` - The ZMQ context to use. - session : :class:`session.Session` - The session to use. - address : zmq url - Standard (ip, port) tuple that the kernel is listening on. - """ - super(HBChannel, self).__init__() - self.daemon = True - - self.loop = loop - - self.context = context - self.session = session - if isinstance(address, tuple): - if address[1] == 0: - message = 'The port number for a channel cannot be 0.' - raise InvalidPortNumber(message) - address = "tcp://%s:%i" % address - self.address = address - - # running is False until `.start()` is called - self._running = False - self._exit = Event() - # don't start paused - self._pause = False - self.poller = zmq.Poller() - - @staticmethod - @atexit.register - def _notice_exit(): - # Class definitions can be torn down during interpreter shutdown. - # We only need to set _exiting flag if this hasn't happened. - if HBChannel is not None: - HBChannel._exiting = True - - def _create_socket(self): - if self.socket is not None: - # close previous socket, before opening a new one - self.poller.unregister(self.socket) - self.socket.close() - self.socket = self.context.socket(zmq.REQ) - self.socket.linger = 1000 - self.socket.connect(self.address) - - self.poller.register(self.socket, zmq.POLLIN) - - def _poll(self, start_time): - """poll for heartbeat replies until we reach self.time_to_dead. - - Ignores interrupts, and returns the result of poll(), which - will be an empty list if no messages arrived before the timeout, - or the event tuple if there is a message to receive. - """ - - until_dead = self.time_to_dead - (time.time() - start_time) - # ensure poll at least once - until_dead = max(until_dead, 1e-3) - events = [] - while True: - try: - events = self.poller.poll(1000 * until_dead) - except ZMQError as e: - if e.errno == errno.EINTR: - # ignore interrupts during heartbeat - # this may never actually happen - until_dead = self.time_to_dead - (time.time() - start_time) - until_dead = max(until_dead, 1e-3) - pass - else: - raise - except Exception: - if self._exiting: - break - else: - raise - else: - break - return events - - def run(self): - """The thread's main activity. Call start() instead.""" - if self.loop is not None: - asyncio.set_event_loop(self.loop) - self._create_socket() - self._running = True - self._beating = True - - while self._running: - if self._pause: - # just sleep, and skip the rest of the loop - self._exit.wait(self.time_to_dead) - continue - - since_last_heartbeat = 0.0 - # no need to catch EFSM here, because the previous event was - # either a recv or connect, which cannot be followed by EFSM - self.socket.send(b'ping') - request_time = time.time() - ready = self._poll(request_time) - if ready: - self._beating = True - # the poll above guarantees we have something to recv - self.socket.recv() - # sleep the remainder of the cycle - remainder = self.time_to_dead - (time.time() - request_time) - if remainder > 0: - self._exit.wait(remainder) - continue - else: - # nothing was received within the time limit, signal heart failure - self._beating = False - since_last_heartbeat = time.time() - request_time - self.call_handlers(since_last_heartbeat) - # and close/reopen the socket, because the REQ/REP cycle has been broken - self._create_socket() - continue - - def pause(self): - """Pause the heartbeat.""" - self._pause = True - - def unpause(self): - """Unpause the heartbeat.""" - self._pause = False - - def is_beating(self): - """Is the heartbeat running and responsive (and not paused).""" - if self.is_alive() and not self._pause and self._beating: - return True - else: - return False - - def stop(self): - """Stop the channel's event loop and join its thread.""" - self._running = False - self._exit.set() - self.join() - self.close() - - def close(self): - if self.socket is not None: - try: - self.socket.close(linger=0) - except Exception: - pass - self.socket = None - - def call_handlers(self, since_last_heartbeat): - """This method is called in the ioloop thread when a message arrives. - - Subclasses should override this method to handle incoming messages. - It is important to remember that this method is called in the thread - so that some logic must be done to ensure that the application level - handlers are called in the application thread. - """ - pass - - -HBChannelABC.register(HBChannel) From df108819d5572faa41bb23f2194f7c85313c9fd8 Mon Sep 17 00:00:00 2001 From: David Brochart Date: Sun, 2 Feb 2020 12:45:29 +0100 Subject: [PATCH 3/6] Fix docstring --- jupyter_client/asynchronous/channels.py | 4 ++-- jupyter_client/asynchronous/client.py | 3 +-- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/jupyter_client/asynchronous/channels.py b/jupyter_client/asynchronous/channels.py index bd973b13a..a18c2d2e9 100644 --- a/jupyter_client/asynchronous/channels.py +++ b/jupyter_client/asynchronous/channels.py @@ -1,4 +1,4 @@ -"""Base classes to manage a Client's interaction with a running kernel""" +"""Async channels""" # Copyright (c) Jupyter Development Team. # Distributed under the terms of the Modified BSD License. @@ -10,7 +10,7 @@ class ZMQSocketChannel(object): - """A ZMQ socket in a simple async API""" + """A ZMQ socket in an async API""" session = None socket = None stream = None diff --git a/jupyter_client/asynchronous/client.py b/jupyter_client/asynchronous/client.py index c523478e7..85abed893 100644 --- a/jupyter_client/asynchronous/client.py +++ b/jupyter_client/asynchronous/client.py @@ -1,5 +1,4 @@ -"""Implements an async kernel client. -""" +"""Implements an async kernel client""" # Copyright (c) Jupyter Development Team. # Distributed under the terms of the Modified BSD License. From 5270b61098320a6c74def4dc7b33d2eb1fe45901 Mon Sep 17 00:00:00 2001 From: David Brochart Date: Sun, 2 Feb 2020 12:47:14 +0100 Subject: [PATCH 4/6] Drop python2 --- .travis.yml | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/.travis.yml b/.travis.yml index 6bac02ce3..fd88a8d32 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,9 +1,10 @@ language: python python: - - 3.6 - - 3.5 - - 2.7 -sudo: false + - "nightly" + - "3.8-dev" + - "3.7" + - "3.6" + - "3.5" install: - pip install --upgrade setuptools pip - pip install --upgrade --upgrade-strategy eager --pre -e .[test] pytest-cov pytest-warnings codecov From af40cb674de8e78536bf579d194d03f4c444cd46 Mon Sep 17 00:00:00 2001 From: David Brochart Date: Tue, 4 Feb 2020 09:53:53 +0100 Subject: [PATCH 5/6] Drop python2 fallback --- jupyter_client/asynchronous/channels.py | 5 +---- jupyter_client/asynchronous/client.py | 26 +++++-------------------- 2 files changed, 6 insertions(+), 25 deletions(-) diff --git a/jupyter_client/asynchronous/channels.py b/jupyter_client/asynchronous/channels.py index a18c2d2e9..395ea99cc 100644 --- a/jupyter_client/asynchronous/channels.py +++ b/jupyter_client/asynchronous/channels.py @@ -3,10 +3,7 @@ # Copyright (c) Jupyter Development Team. # Distributed under the terms of the Modified BSD License. -try: - from queue import Queue, Empty # Py 3 -except ImportError: - from Queue import Queue, Empty # Py 2 +from queue import Queue, Empty class ZMQSocketChannel(object): diff --git a/jupyter_client/asynchronous/client.py b/jupyter_client/asynchronous/client.py index 85abed893..93358a53e 100644 --- a/jupyter_client/asynchronous/client.py +++ b/jupyter_client/asynchronous/client.py @@ -22,18 +22,6 @@ from jupyter_client.client import KernelClient from .channels import ZMQSocketChannel -try: - monotonic = time.monotonic -except AttributeError: - # py2 - monotonic = time.time # close enough - -try: - TimeoutError -except NameError: - # py2 - TimeoutError = RuntimeError - def reqrep(meth, channel='shell'): def wrapped(self, *args, **kwargs): @@ -177,10 +165,10 @@ async def wait_for_ready(self, timeout=None): async def _recv_reply(self, msg_id, timeout=None, channel='shell'): """Receive and return the reply for a given request""" if timeout is not None: - deadline = monotonic() + timeout + deadline = time.monotonic() + timeout while True: if timeout is not None: - timeout = max(0, deadline - monotonic()) + timeout = max(0, deadline - time.monotonic()) try: if channel == 'control': reply = await self.get_control_msg(timeout=timeout) @@ -211,8 +199,6 @@ def _stdin_hook_default(self, msg): content = msg['content'] if content.get('password', False): prompt = getpass - elif sys.version_info < (3,): - prompt = raw_input else: prompt = input @@ -265,8 +251,6 @@ async def execute_interactive(self, code, silent=False, store_history=True, You can pass a custom output_hook callable that will be called with every IOPub message that is produced instead of the default redisplay. - .. versionadded:: 5.0 - Parameters ---------- code : str @@ -345,7 +329,7 @@ async def execute_interactive(self, code, silent=False, store_history=True, # set deadline based on timeout if timeout is not None: - deadline = monotonic() + timeout + deadline = time.monotonic() + timeout else: timeout_ms = None @@ -361,7 +345,7 @@ async def execute_interactive(self, code, silent=False, store_history=True, # wait for output and redisplay it while True: if timeout is not None: - timeout = max(0, deadline - monotonic()) + timeout = max(0, deadline - time.monotonic()) timeout_ms = 1e3 * timeout events = dict(poller.poll(timeout_ms)) if not events: @@ -387,5 +371,5 @@ async def execute_interactive(self, code, silent=False, store_history=True, # output is done, get the reply if timeout is not None: - timeout = max(0, deadline - monotonic()) + timeout = max(0, deadline - time.monotonic()) return await self._recv_reply(msg_id, timeout=timeout) From 69224f2bb3775472cfeb19b0294080d31d20490f Mon Sep 17 00:00:00 2001 From: David Brochart Date: Tue, 4 Feb 2020 10:42:58 +0100 Subject: [PATCH 6/6] Drop more python2 stuff --- jupyter_client/asynchronous/client.py | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/jupyter_client/asynchronous/client.py b/jupyter_client/asynchronous/client.py index 93358a53e..1a0df29b1 100644 --- a/jupyter_client/asynchronous/client.py +++ b/jupyter_client/asynchronous/client.py @@ -2,14 +2,9 @@ # Copyright (c) Jupyter Development Team. # Distributed under the terms of the Modified BSD License. -from __future__ import print_function - from functools import partial from getpass import getpass -try: - from queue import Empty # Python 3 -except ImportError: - from Queue import Empty # Python 2 +from queue import Empty import sys import time