Skip to content

Commit

Permalink
Swap sched for OutStream
Browse files Browse the repository at this point in the history
  • Loading branch information
Bago Amirbekian committed Mar 24, 2022
1 parent d6744f9 commit 1402d86
Showing 1 changed file with 32 additions and 111 deletions.
143 changes: 32 additions & 111 deletions ipykernel/iostream.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,26 +4,18 @@
# Distributed under the terms of the Modified BSD License.

import atexit
from binascii import b2a_hex
from collections import deque
from imp import lock_held as import_lock_held
from itertools import count
import os
import sched
import sys
import threading
import time
import warnings
from weakref import WeakSet
import traceback
from io import StringIO, TextIOBase
import io

import zmq
if zmq.pyzmq_version_info() >= (17, 0):
from tornado.ioloop import IOLoop
else:
# deprecated since pyzmq 17
from zmq.eventloop.ioloop import IOLoop
from zmq.eventloop.zmqstream import ZMQStream

from jupyter_client.session import extract_header


Expand Down Expand Up @@ -63,13 +55,10 @@ def __init__(self, socket, pipe=False):
self.background_socket = BackgroundSocket(self)
self._master_pid = os.getpid()
self._pipe_flag = pipe
self.io_loop = IOLoop(make_current=False)
if pipe:
self._setup_pipe_in()
self._local = threading.local()
self._events = deque()
self._event_pipes = WeakSet()
self._setup_event_pipe()
self._scheduler = sched.scheduler(time.time, time.sleep)
self._scheduler_event = threading.Event()
self._event_sequencer = count()
self._alive = True
self.thread = threading.Thread(target=self._thread_main, name="IOPub")
self.thread.daemon = True
self.thread.pydev_do_not_trace = True
Expand All @@ -78,75 +67,16 @@ def __init__(self, socket, pipe=False):

def _thread_main(self):
"""The inner loop that's actually run in a thread"""
self.io_loop.make_current()
self.io_loop.start()
self.io_loop.close(all_fds=True)

def _setup_event_pipe(self):
"""Create the PULL socket listening for events that should fire in this thread."""
ctx = self.socket.context
pipe_in = ctx.socket(zmq.PULL)
pipe_in.linger = 0

_uuid = b2a_hex(os.urandom(16)).decode('ascii')
iface = self._event_interface = 'inproc://%s' % _uuid
pipe_in.bind(iface)
self._event_puller = ZMQStream(pipe_in, self.io_loop)
self._event_puller.on_recv(self._handle_event)

@property
def _event_pipe(self):
"""thread-local event pipe for signaling events that should be processed in the thread"""
try:
event_pipe = self._local.event_pipe
except AttributeError:
# new thread, new event pipe
ctx = self.socket.context
event_pipe = ctx.socket(zmq.PUSH)
event_pipe.linger = 0
event_pipe.connect(self._event_interface)
self._local.event_pipe = event_pipe
# WeakSet so that event pipes will be closed by garbage collection
# when their threads are terminated
self._event_pipes.add(event_pipe)
return event_pipe

def _handle_event(self, msg):
"""Handle an event on the event pipe
Content of the message is ignored.
Whenever *an* event arrives on the event stream,
*all* waiting events are processed in order.
"""
# freeze event count so new writes don't extend the queue
# while we are processing
n_events = len(self._events)
for i in range(n_events):
event_f = self._events.popleft()
event_f()

def _setup_pipe_in(self):
"""setup listening pipe for IOPub from forked subprocesses"""
ctx = self.socket.context

# use UUID to authenticate pipe messages
self._pipe_uuid = os.urandom(16)

pipe_in = ctx.socket(zmq.PULL)
pipe_in.linger = 0

try:
self._pipe_port = pipe_in.bind_to_random_port("tcp://127.0.0.1")
except zmq.ZMQError as e:
warnings.warn("Couldn't bind IOPub Pipe to 127.0.0.1: %s" % e +
"\nsubprocess output will be unavailable."
)
self._pipe_flag = False
pipe_in.close()
return
self._pipe_in = ZMQStream(pipe_in, self.io_loop)
self._pipe_in.on_recv(self._handle_pipe_msg)
scheduler_event = self._scheduler_event
scheduler = self._scheduler
while self._alive:
try:
delay = scheduler.run(False)
except Exception:
sys.__stderr__.write(traceback.format_exc())
delay = 0
if scheduler_event.wait(delay):
scheduler_event.clear()

def _handle_pipe_msg(self, msg):
"""handle a pipe message from a subprocess"""
Expand All @@ -157,14 +87,6 @@ def _handle_pipe_msg(self, msg):
return
self.send_multipart(msg[1:])

def _setup_pipe_out(self):
# must be new context after fork
ctx = zmq.Context()
pipe_out = ctx.socket(zmq.PUSH)
pipe_out.linger = 3000 # 3s timeout for pipe_out sends before discarding the message
pipe_out.connect("tcp://127.0.0.1:%i" % self._pipe_port)
return ctx, pipe_out

def _is_master_process(self):
return os.getpid() == self._master_pid

Expand All @@ -187,13 +109,16 @@ def stop(self):
"""Stop the IOPub thread"""
if not self.thread.is_alive():
return
self.io_loop.add_callback(self.io_loop.stop)
scheduler = self._scheduler
self._alive = False
for event in scheduler.queue:
try:
scheduler.cancel(event)
except ValueError:
# Events probably started running, too late to cancel
pass
self._scheduler_event.set()
self.thread.join()
# close *all* event pipes, created in any thread
# event pipes can only be used from other threads while self.thread.is_alive()
# so after thread.join, this should be safe
for event_pipe in self._event_pipes:
event_pipe.close()

def close(self):
if self.closed:
Expand All @@ -205,15 +130,16 @@ def close(self):
def closed(self):
return self.socket is None

def schedule(self, f):
def schedule(self, f, delay=0.0):
"""Schedule a function to be called in our IO thread.
If the thread is not running, call immediately.
"""
if self.thread.is_alive():
self._events.append(f)
# wake event thread (message content is ignored)
self._event_pipe.send(b'')
# Event sequencer is used to work around event order bug in python versions <= 3.9
# https://github.com/python/cpython/pull/22729
self._scheduler.enter(delay, next(self._event_sequencer), f)
self._scheduler_event.set()
else:
f()

Expand Down Expand Up @@ -375,7 +301,6 @@ def __init__(
self._master_pid = os.getpid()
self._flush_pending = False
self._subprocess_flush_pending = False
self._io_loop = pub_thread.io_loop
self._new_buffer()
self.echo = None
self._isatty = bool(isatty)
Expand Down Expand Up @@ -444,11 +369,7 @@ def _schedule_flush(self):
if self._flush_pending:
return
self._flush_pending = True

# add_timeout has to be handed to the io thread via event pipe
def _schedule_in_thread():
self._io_loop.call_later(self.flush_interval, self._flush)
self.pub_thread.schedule(_schedule_in_thread)
self.pub_thread.schedule(self._flush, self.flush_interval)

def flush(self):
"""trigger actual zmq send
Expand Down

0 comments on commit 1402d86

Please sign in to comment.