Skip to content

Commit

Permalink
Trac #28356: Enhanced new build_many to use on all platforms
Browse files Browse the repository at this point in the history
In #27490 I hacked together a replacement for the
`sage_setup.docbuild.build_many` function, which implements (blocking)
parallel map() of sorts, which solved some problems with using
`multiprocessing.Pool.map` that stems from its use of threads to fork
new processes.

That solved a problem that was specific to older versions of Cygwin.
However, there is a similar problem, which affects all platforms, with
PARI built with multi-threading support: #26608.

Although the PARI problem begs a more complete solution, at least for
the docbuild we can get around it by using the `build_many` from #27490.

This ticket makes some fixes and enhancements to `build_many`, so that
it can also return a result from the function it runs.  For the docbuild
this feature is not strictly needed, except for the fact that it can
also be used (as in `Pool.map`) to raise any exception that occurs in
one of the worker processes.  Thus, it's closer in functionality, at
least for the purposes of the docbuild, to `Pool.map`.

URL: https://trac.sagemath.org/28356
Reported by: embray
Ticket author(s): Erik Bray
Reviewer(s): Dima Pasechnik
  • Loading branch information
Release Manager committed Sep 4, 2019
2 parents 246658e + e8d26b6 commit 7bbf965
Show file tree
Hide file tree
Showing 2 changed files with 136 additions and 80 deletions.
45 changes: 9 additions & 36 deletions src/sage_setup/docbuild/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@
import sage.all
from sage.misc.cachefunc import cached_method
from sage.misc.misc import sage_makedirs
from sage.env import DOT_SAGE, SAGE_DOC_SRC, SAGE_DOC, SAGE_SRC, CYGWIN_VERSION
from sage.env import SAGE_DOC_SRC, SAGE_DOC, SAGE_SRC

from .build_options import (LANGUAGES, SPHINXOPTS, PAPER, OMIT,
PAPEROPTS, ALLSPHINXOPTS, NUM_THREADS, WEBSITESPHINXOPTS,
Expand Down Expand Up @@ -270,44 +270,17 @@ def clean(self, *args):
inventory = builder_helper('inventory')


def _build_many(target, args):
# Pool() uses an actual fork() to run each new instance. This is
# important for performance reasons, i.e., don't use a forkserver when
# it becomes available with Python 3: Here, sage is already initialized
# which is quite costly, with a forkserver we would have to
# reinitialize it for every document we build. At the same time, don't
# serialize this by taking the pool (and thus the call to fork()) out
# completely: The call to Sphinx leaks memory, so we need to build each
# document in its own process to control the RAM usage.
from multiprocessing import Pool
pool = Pool(NUM_THREADS, maxtasksperchild=1)
# map_async handles KeyboardInterrupt correctly. Plain map and
# apply_async does not, so don't use it.
x = pool.map_async(target, args, 1)
from .utils import build_many as _build_many
def build_many(target, args):
"""
Thin wrapper around `sage_setup.docbuild.utils.build_many` which uses the
docbuild settings ``NUM_THREADS`` and ``ABORT_ON_ERROR``.
"""
try:
ret = x.get(99999)
pool.close()
pool.join()
except Exception:
pool.terminate()
_build_many(target, args, processes=NUM_THREADS)
except BaseException as exc:
if ABORT_ON_ERROR:
raise
return ret

if (os.environ.get('SAGE_PARI_CFG', '') !='') and (not (CYGWIN_VERSION and CYGWIN_VERSION[0] < 3)):
build_many = _build_many
else:
# Cygwin 64-bit < 3.0.0 has a bug with exception handling when exceptions
# occur in pthreads, so it's dangerous to use multiprocessing.Pool, as
# signals can't be properly handled in worker processes, and they can crash
# causing the docbuild to hang. But where are these pthreads, you ask?
# Well, multiprocessing.Pool runs a thread from which it starts new worker
# processes when old workers complete/die, so the worker processes behave
# as though they were started from a pthread, even after fork(), and are
# actually succeptible to this bug. As a workaround, here's a naïve but
# good-enough "pool" replacement that does not use threads
# https://trac.sagemath.org/ticket/27214#comment:25 for further discussion.
from .utils import _build_many as build_many


##########################################
Expand Down
171 changes: 127 additions & 44 deletions src/sage_setup/docbuild/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,37 +7,48 @@
class WorkerDiedException(RuntimeError):
"""Raised if a worker process dies unexpected."""

def __init__(self, message, original_exception=None):
super(WorkerDiedException, self).__init__(message)
self.original_exception = original_exception

def _build_many(target, args, processes=None):

def build_many(target, args, processes=None):
"""
Map a list of arguments in ``args`` to a single-argument target function
``target`` in parallel using ``NUM_THREADS`` (or ``processes`` if given)
simultaneous processes.
``target`` in parallel using ``multiprocessing.cpu_count()`` (or
``processes`` if given) simultaneous processes.
This is a simplified version of ``multiprocessing.Pool.map`` from the
Python standard library which avoids a couple of its pitfalls. In
particular, it can abort (with a `RuntimeError`) without hanging if one of
the worker processes unexpectedly dies. It also avoids starting new
processes from a pthread, which is known to result in bugs on versions of
Cygwin prior to 3.0.0 (see
https://trac.sagemath.org/ticket/27214#comment:25).
the worker processes unexpectedly dies. It also has semantics equivalent
to ``maxtasksperchild=1``; that is, one process is started per argument.
As such, this is inefficient for processing large numbers of fast tasks,
but appropriate for running longer tasks (such as doc builds) which may
also require significant cleanup.
It also avoids starting new processes from a pthread, which results in at
least two known issues:
On the other hand, unlike ``multiprocessing.Pool.map`` it does not return
a result. This is fine for the purpose of building multiple Sphinx
documents in parallel.
* On versions of Cygwin prior to 3.0.0 there were bugs in mmap handling
on threads (see https://trac.sagemath.org/ticket/27214#comment:25).
* When PARI is built with multi-threading support, forking a Sage
process from a thread leaves the main Pari interface instance broken
(see https://trac.sagemath.org/ticket/26608#comment:38).
In the future this may be replaced by a generalized version of the more
robust parallel processing implementation from ``sage.doctest.forker``.
EXAMPLES::
sage: from sage_setup.docbuild.utils import _build_many
sage: from sage_setup.docbuild.utils import build_many
sage: def target(N):
....: import time
....: time.sleep(float(0.1))
....: print('Processed task %s' % N)
....:
sage: _build_many(target, range(8), processes=8)
sage: _ = build_many(target, range(8), processes=8)
Processed task ...
Processed task ...
Processed task ...
Expand All @@ -47,13 +58,23 @@ def _build_many(target, args, processes=None):
Processed task ...
Processed task ...
If one of the worker processes errors out from an unhandled exception, or
otherwise exits non-zero (e.g. killed by a signal) any in-progress tasks
will be completed gracefully, but then a `RuntimeError` is raised and
pending tasks are not started::
Unlike the first version of `build_many` which was only intended to get
around the Cygwin bug, this version can also return a result, and thus can
be used as a replacement for `multiprocessing.Pool.map` (i.e. it still
blocks until the result is ready)::
sage: def square(N):
....: return N * N
sage: build_many(square, range(100))
[0, 1, 4, 9, ..., 9604, 9801]
If the target function raises an exception in any of the workers,
`build_many` raises that exception and all other results are discarded.
Any in-progress tasks may still be allowed to complete gracefully before
the exception is raised::
sage: def target(N):
....: import time
....: import time, os, signal
....: if N == 4:
....: # Task 4 is a poison pill
....: 1 / 0
Expand All @@ -67,48 +88,86 @@ def _build_many(target, args, processes=None):
traceback from the failing process on stderr. However, due to how the
doctest runner works, the doctest will only expect the final exception::
sage: _build_many(target, range(8), processes=8)
sage: build_many(target, range(8), processes=8)
Traceback (most recent call last):
...
WorkerDiedException: worker for 4 died with non-zero exit code 1
ZeroDivisionError: rational division by zero
Similarly, if one of the worker processes dies unexpectedly otherwise exits
non-zero (e.g. killed by a signal) any in-progress tasks will be completed
gracefully, but then a `RuntimeError` is raised and pending tasks are not
started::
sage: def target(N):
....: import time, os, signal
....: if N == 4:
....: # Task 4 is a poison pill
....: os.kill(os.getpid(), signal.SIGKILL)
....: else:
....: time.sleep(0.5)
....: print('Processed task %s' % N)
....:
sage: build_many(target, range(8), processes=8)
Traceback (most recent call last):
...
WorkerDiedException: worker for 4 died with non-zero exit code -9
"""
from multiprocessing import Process
from .build_options import NUM_THREADS, ABORT_ON_ERROR
from multiprocessing import Process, Queue, cpu_count
from six.moves.queue import Empty

if processes is None:
processes = NUM_THREADS
processes = cpu_count()

workers = [None] * processes
queue = list(args)

# Maps worker process PIDs to the name of the document it's working
# on (the argument it was passed). This is primarily used just for
# debugging/information purposes.
jobs = {}
tasks = enumerate(args)
results = []
result_queue = Queue()

### Utility functions ###
def run_worker(target, queue, idx, task):
try:
result = target(task)
except BaseException as exc:
queue.put((None, exc))
else:
queue.put((idx, result))

def bring_out_yer_dead(w, exitcode):
def bring_out_yer_dead(w, task, exitcode):
"""
Handle a dead / completed worker. Raises WorkerDiedError if it
Handle a dead / completed worker. Raises WorkerDiedException if it
returned with a non-zero exit code.
"""

if w is None or exitcode is None:
# I'm not dead yet! (or I haven't even been born yet)
return w
return (w, task)

# Hack: If we wait()ed on this worker manually we have to tell it
# it's dead:
if w._popen.returncode is None:
w._popen.returncode = exitcode

if exitcode != 0 and ABORT_ON_ERROR:
if exitcode != 0:
raise WorkerDiedException(
"worker for {} died with non-zero exit code "
"{}".format(jobs[w.pid], w.exitcode))
"{}".format(task[1], w.exitcode))

# Get result from the queue; depending on ordering this may not be
# *the* result for this worker, but for each completed worker there
# should be *a* result so let's get it
try:
result = result_queue.get_nowait()
except Empty:
# Generally shouldn't happen but could in case of a race condition;
# don't worry we'll collect any remaining results at the end.
pass

if result[0] is None:
# Indicates that an exception occurred in the target function
raise WorkerDiedException('', original_exception=result[1])
else:
results.append(result)

jobs.pop(w.pid)
# Helps multiprocessing with some internal bookkeeping
w.join()

Expand Down Expand Up @@ -147,20 +206,28 @@ def reap_workers(waited_pid=None, waited_exitcode=None):

for idx, w in enumerate(workers):
if w is not None:
w, task = w
if w.pid == waited_pid:
exitcode = waited_exitcode
else:
exitcode = w.exitcode

w = bring_out_yer_dead(w, exitcode)
w = bring_out_yer_dead(w, task, exitcode)

# Worker w is dead/not started, so start a new worker
# in its place with the next document from the queue
if w is None and queue:
job = queue.pop(0)
w = Process(target=target, args=(job,))
w.start()
jobs[w.pid] = job
if w is None:
try:
task = next(tasks)
except StopIteration:
pass
else:
w = Process(target=run_worker,
args=((target, result_queue) + task))
w.start()
# Pair the new worker with the task it's performing (mostly
# for debugging purposes)
w = (w, task)

workers[idx] = w

Expand Down Expand Up @@ -197,18 +264,34 @@ def reap_workers(waited_pid=None, waited_exitcode=None):
finally:
try:
remaining_workers = [w for w in workers if w is not None]
for w in remaining_workers:
for w, _ in remaining_workers:
# Give any remaining workers a chance to shut down gracefully
try:
w.terminate()
except OSError as exc:
if exc.errno != errno.ESRCH:
# Otherwise it was already dead so this was expected
raise
for w in remaining_workers:
for w, _ in remaining_workers:
w.join()
finally:
if worker_exc is not None:
# Re-raise the RuntimeError from bring_out_yer_dead set if a
# worker died unexpectedly
raise worker_exc
# worker died unexpectedly, or the original exception if it's
# wrapping one
if worker_exc.original_exception:
raise worker_exc.original_exception
else:
raise worker_exc

# All workers should be shut down by now and should have completed without
# error. No new items will be added to the result queue, so we can get all
# the remaining results, if any.
while True:
try:
results.append(result_queue.get_nowait())
except Empty:
break

# Return the results sorted according to their original task order
return [r[1] for r in sorted(results, key=lambda r: r[0])]

0 comments on commit 7bbf965

Please sign in to comment.