From 65c7563f0f1957deb8ea876ce387f99852b16129 Mon Sep 17 00:00:00 2001 From: "Erik M. Bray" Date: Sat, 1 Jan 2011 21:21:00 +0000 Subject: [PATCH 1/2] Enhance the new build_many to actually return results (including exceptions). Now it can be used even as a somewhat more general parallel map. --- src/sage_setup/docbuild/utils.py | 171 +++++++++++++++++++++++-------- 1 file changed, 127 insertions(+), 44 deletions(-) diff --git a/src/sage_setup/docbuild/utils.py b/src/sage_setup/docbuild/utils.py index 3f7142dc3db..272f7e2d0f3 100644 --- a/src/sage_setup/docbuild/utils.py +++ b/src/sage_setup/docbuild/utils.py @@ -7,37 +7,48 @@ class WorkerDiedException(RuntimeError): """Raised if a worker process dies unexpected.""" + def __init__(self, message, original_exception=None): + super(WorkerDiedException, self).__init__(message) + self.original_exception = original_exception -def _build_many(target, args, processes=None): + +def build_many(target, args, processes=None): """ Map a list of arguments in ``args`` to a single-argument target function - ``target`` in parallel using ``NUM_THREADS`` (or ``processes`` if given) - simultaneous processes. + ``target`` in parallel using ``multiprocessing.cpu_count()`` (or + ``processes`` if given) simultaneous processes. This is a simplified version of ``multiprocessing.Pool.map`` from the Python standard library which avoids a couple of its pitfalls. In particular, it can abort (with a `RuntimeError`) without hanging if one of - the worker processes unexpectedly dies. It also avoids starting new - processes from a pthread, which is known to result in bugs on versions of - Cygwin prior to 3.0.0 (see - https://trac.sagemath.org/ticket/27214#comment:25). + the worker processes unexpectedly dies. It also has semantics equivalent + to ``maxtasksperchild=1``; that is, one process is started per argument. + As such, this is inefficient for processing large numbers of fast tasks, + but appropriate for running longer tasks (such as doc builds) which may + also require significant cleanup. + + It also avoids starting new processes from a pthread, which results in at + least two known issues: - On the other hand, unlike ``multiprocessing.Pool.map`` it does not return - a result. This is fine for the purpose of building multiple Sphinx - documents in parallel. + * On versions of Cygwin prior to 3.0.0 there were bugs in mmap handling + on threads (see https://trac.sagemath.org/ticket/27214#comment:25). + + * When PARI is built with multi-threading support, forking a Sage + process from a thread leaves the main Pari interface instance broken + (see https://trac.sagemath.org/ticket/26608#comment:38). In the future this may be replaced by a generalized version of the more robust parallel processing implementation from ``sage.doctest.forker``. EXAMPLES:: - sage: from sage_setup.docbuild.utils import _build_many + sage: from sage_setup.docbuild.utils import build_many sage: def target(N): ....: import time ....: time.sleep(float(0.1)) ....: print('Processed task %s' % N) ....: - sage: _build_many(target, range(8), processes=8) + sage: _ = build_many(target, range(8), processes=8) Processed task ... Processed task ... Processed task ... @@ -47,13 +58,23 @@ def _build_many(target, args, processes=None): Processed task ... Processed task ... - If one of the worker processes errors out from an unhandled exception, or - otherwise exits non-zero (e.g. killed by a signal) any in-progress tasks - will be completed gracefully, but then a `RuntimeError` is raised and - pending tasks are not started:: + Unlike the first version of `build_many` which was only intended to get + around the Cygwin bug, this version can also return a result, and thus can + be used as a replacement for `multiprocessing.Pool.map` (i.e. it still + blocks until the result is ready):: + + sage: def square(N): + ....: return N * N + sage: build_many(square, range(100)) + [0, 1, 4, 9, ..., 9604, 9801] + + If the target function raises an exception in any of the workers, + `build_many` raises that exception and all other results are discarded. + Any in-progress tasks may still be allowed to complete gracefully before + the exception is raised:: sage: def target(N): - ....: import time + ....: import time, os, signal ....: if N == 4: ....: # Task 4 is a poison pill ....: 1 / 0 @@ -67,48 +88,86 @@ def _build_many(target, args, processes=None): traceback from the failing process on stderr. However, due to how the doctest runner works, the doctest will only expect the final exception:: - sage: _build_many(target, range(8), processes=8) + sage: build_many(target, range(8), processes=8) Traceback (most recent call last): ... - WorkerDiedException: worker for 4 died with non-zero exit code 1 + ZeroDivisionError: rational division by zero + + Similarly, if one of the worker processes dies unexpectedly otherwise exits + non-zero (e.g. killed by a signal) any in-progress tasks will be completed + gracefully, but then a `RuntimeError` is raised and pending tasks are not + started:: + + sage: def target(N): + ....: import time, os, signal + ....: if N == 4: + ....: # Task 4 is a poison pill + ....: os.kill(os.getpid(), signal.SIGKILL) + ....: else: + ....: time.sleep(0.5) + ....: print('Processed task %s' % N) + ....: + sage: build_many(target, range(8), processes=8) + Traceback (most recent call last): + ... + WorkerDiedException: worker for 4 died with non-zero exit code -9 """ - from multiprocessing import Process - from .build_options import NUM_THREADS, ABORT_ON_ERROR + from multiprocessing import Process, Queue, cpu_count + from six.moves.queue import Empty if processes is None: - processes = NUM_THREADS + processes = cpu_count() workers = [None] * processes - queue = list(args) - - # Maps worker process PIDs to the name of the document it's working - # on (the argument it was passed). This is primarily used just for - # debugging/information purposes. - jobs = {} + tasks = enumerate(args) + results = [] + result_queue = Queue() ### Utility functions ### + def run_worker(target, queue, idx, task): + try: + result = target(task) + except BaseException as exc: + queue.put((None, exc)) + else: + queue.put((idx, result)) - def bring_out_yer_dead(w, exitcode): + def bring_out_yer_dead(w, task, exitcode): """ - Handle a dead / completed worker. Raises WorkerDiedError if it + Handle a dead / completed worker. Raises WorkerDiedException if it returned with a non-zero exit code. """ if w is None or exitcode is None: # I'm not dead yet! (or I haven't even been born yet) - return w + return (w, task) # Hack: If we wait()ed on this worker manually we have to tell it # it's dead: if w._popen.returncode is None: w._popen.returncode = exitcode - if exitcode != 0 and ABORT_ON_ERROR: + if exitcode != 0: raise WorkerDiedException( "worker for {} died with non-zero exit code " - "{}".format(jobs[w.pid], w.exitcode)) + "{}".format(task[1], w.exitcode)) + + # Get result from the queue; depending on ordering this may not be + # *the* result for this worker, but for each completed worker there + # should be *a* result so let's get it + try: + result = result_queue.get_nowait() + except Empty: + # Generally shouldn't happen but could in case of a race condition; + # don't worry we'll collect any remaining results at the end. + pass + + if result[0] is None: + # Indicates that an exception occurred in the target function + raise WorkerDiedException('', original_exception=result[1]) + else: + results.append(result) - jobs.pop(w.pid) # Helps multiprocessing with some internal bookkeeping w.join() @@ -147,20 +206,28 @@ def reap_workers(waited_pid=None, waited_exitcode=None): for idx, w in enumerate(workers): if w is not None: + w, task = w if w.pid == waited_pid: exitcode = waited_exitcode else: exitcode = w.exitcode - w = bring_out_yer_dead(w, exitcode) + w = bring_out_yer_dead(w, task, exitcode) # Worker w is dead/not started, so start a new worker # in its place with the next document from the queue - if w is None and queue: - job = queue.pop(0) - w = Process(target=target, args=(job,)) - w.start() - jobs[w.pid] = job + if w is None: + try: + task = next(tasks) + except StopIteration: + pass + else: + w = Process(target=run_worker, + args=((target, result_queue) + task)) + w.start() + # Pair the new worker with the task it's performing (mostly + # for debugging purposes) + w = (w, task) workers[idx] = w @@ -197,7 +264,7 @@ def reap_workers(waited_pid=None, waited_exitcode=None): finally: try: remaining_workers = [w for w in workers if w is not None] - for w in remaining_workers: + for w, _ in remaining_workers: # Give any remaining workers a chance to shut down gracefully try: w.terminate() @@ -205,10 +272,26 @@ def reap_workers(waited_pid=None, waited_exitcode=None): if exc.errno != errno.ESRCH: # Otherwise it was already dead so this was expected raise - for w in remaining_workers: + for w, _ in remaining_workers: w.join() finally: if worker_exc is not None: # Re-raise the RuntimeError from bring_out_yer_dead set if a - # worker died unexpectedly - raise worker_exc + # worker died unexpectedly, or the original exception if it's + # wrapping one + if worker_exc.original_exception: + raise worker_exc.original_exception + else: + raise worker_exc + + # All workers should be shut down by now and should have completed without + # error. No new items will be added to the result queue, so we can get all + # the remaining results, if any. + while True: + try: + results.append(result_queue.get_nowait()) + except Empty: + break + + # Return the results sorted according to their original task order + return [r[1] for r in sorted(results, key=lambda r: r[0])] From e8d26b6c450c2e9aeac9e78efe64d6e4dbfbe8c7 Mon Sep 17 00:00:00 2001 From: "Erik M. Bray" Date: Sat, 1 Jan 2011 21:23:06 +0000 Subject: [PATCH 2/2] Use new build_many on all platforms. It is more stable in general and avoids problems like https://trac.sagemath.org/ticket/26608 --- src/sage_setup/docbuild/__init__.py | 45 ++++++----------------------- 1 file changed, 9 insertions(+), 36 deletions(-) diff --git a/src/sage_setup/docbuild/__init__.py b/src/sage_setup/docbuild/__init__.py index 1f9bce3bb94..045c6ad4f27 100644 --- a/src/sage_setup/docbuild/__init__.py +++ b/src/sage_setup/docbuild/__init__.py @@ -59,7 +59,7 @@ import sage.all from sage.misc.cachefunc import cached_method from sage.misc.misc import sage_makedirs -from sage.env import DOT_SAGE, SAGE_DOC_SRC, SAGE_DOC, SAGE_SRC, CYGWIN_VERSION +from sage.env import SAGE_DOC_SRC, SAGE_DOC, SAGE_SRC from .build_options import (LANGUAGES, SPHINXOPTS, PAPER, OMIT, PAPEROPTS, ALLSPHINXOPTS, NUM_THREADS, WEBSITESPHINXOPTS, @@ -270,44 +270,17 @@ def clean(self, *args): inventory = builder_helper('inventory') -def _build_many(target, args): - # Pool() uses an actual fork() to run each new instance. This is - # important for performance reasons, i.e., don't use a forkserver when - # it becomes available with Python 3: Here, sage is already initialized - # which is quite costly, with a forkserver we would have to - # reinitialize it for every document we build. At the same time, don't - # serialize this by taking the pool (and thus the call to fork()) out - # completely: The call to Sphinx leaks memory, so we need to build each - # document in its own process to control the RAM usage. - from multiprocessing import Pool - pool = Pool(NUM_THREADS, maxtasksperchild=1) - # map_async handles KeyboardInterrupt correctly. Plain map and - # apply_async does not, so don't use it. - x = pool.map_async(target, args, 1) +from .utils import build_many as _build_many +def build_many(target, args): + """ + Thin wrapper around `sage_setup.docbuild.utils.build_many` which uses the + docbuild settings ``NUM_THREADS`` and ``ABORT_ON_ERROR``. + """ try: - ret = x.get(99999) - pool.close() - pool.join() - except Exception: - pool.terminate() + _build_many(target, args, processes=NUM_THREADS) + except BaseException as exc: if ABORT_ON_ERROR: raise - return ret - -if (os.environ.get('SAGE_PARI_CFG', '') !='') and (not (CYGWIN_VERSION and CYGWIN_VERSION[0] < 3)): - build_many = _build_many -else: - # Cygwin 64-bit < 3.0.0 has a bug with exception handling when exceptions - # occur in pthreads, so it's dangerous to use multiprocessing.Pool, as - # signals can't be properly handled in worker processes, and they can crash - # causing the docbuild to hang. But where are these pthreads, you ask? - # Well, multiprocessing.Pool runs a thread from which it starts new worker - # processes when old workers complete/die, so the worker processes behave - # as though they were started from a pthread, even after fork(), and are - # actually succeptible to this bug. As a workaround, here's a naïve but - # good-enough "pool" replacement that does not use threads - # https://trac.sagemath.org/ticket/27214#comment:25 for further discussion. - from .utils import _build_many as build_many ##########################################