Skip to content
This repository has been archived by the owner on Jan 30, 2023. It is now read-only.

Commit

Permalink
Trac #27490: Simplistic multiprocessing.Pool replacement for parallel…
Browse files Browse the repository at this point in the history
… docbuild on older Cygwin
  • Loading branch information
embray committed Mar 15, 2019
1 parent 1040752 commit f9aabb7
Showing 1 changed file with 75 additions and 26 deletions.
101 changes: 75 additions & 26 deletions src/sage_setup/docbuild/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@

import optparse, os, shutil, subprocess, sys, re
import logging, warnings
import time

logger = logging.getLogger(__name__)

Expand All @@ -53,7 +54,7 @@
import sage.all
from sage.misc.cachefunc import cached_method
from sage.misc.misc import sage_makedirs
from sage.env import SAGE_DOC_SRC, SAGE_DOC, SAGE_SRC
from sage.env import SAGE_DOC_SRC, SAGE_DOC, SAGE_SRC, CYGWIN_VERSION

from .build_options import (LANGUAGES, SPHINXOPTS, PAPER, OMIT,
PAPEROPTS, ALLSPHINXOPTS, NUM_THREADS, WEBSITESPHINXOPTS,
Expand Down Expand Up @@ -264,29 +265,79 @@ def clean(self, *args):
# import the customized builder for object.inv files
inventory = builder_helper('inventory')

def build_many(target, args):
# Pool() uses an actual fork() to run each new instance. This is important
# for performance reasons, i.e., don't use a forkserver when it becomes
# available with Python 3: Here, sage is already initialized which is quite
# costly, with a forkserver we would have to reinitialize it for every
# document we build. At the same time, don't serialize this by taking the
# pool (and thus the call to fork()) out completely: The call to Sphinx
# leaks memory, so we need to build each document in its own process to
# control the RAM usage.
from multiprocessing import Pool
pool = Pool(NUM_THREADS, maxtasksperchild=1)
# map_async handles KeyboardInterrupt correctly. Plain map and
# apply_async does not, so don't use it.
x = pool.map_async(target, args, 1)
try:
ret = x.get(99999)
pool.close()
pool.join()
except Exception:
pool.terminate()
if ABORT_ON_ERROR:
raise
return ret

if not (CYGWIN_VERSION and CYGWIN_VERSION[0] < 3):
def build_many(target, args):
# Pool() uses an actual fork() to run each new instance. This is
# important for performance reasons, i.e., don't use a forkserver when
# it becomes available with Python 3: Here, sage is already initialized
# which is quite costly, with a forkserver we would have to
# reinitialize it for every document we build. At the same time, don't
# serialize this by taking the pool (and thus the call to fork()) out
# completely: The call to Sphinx leaks memory, so we need to build each
# document in its own process to control the RAM usage.
from multiprocessing import Pool
pool = Pool(NUM_THREADS, maxtasksperchild=1)
# map_async handles KeyboardInterrupt correctly. Plain map and
# apply_async does not, so don't use it.
x = pool.map_async(target, args, 1)
try:
ret = x.get(99999)
pool.close()
pool.join()
except Exception:
pool.terminate()
if ABORT_ON_ERROR:
raise
return ret
else:
# Cygwin 64-bit < 3.0.0 has a bug with exception handling when exceptions
# occur in pthreads, so it's dangerous to use multiprocessing.Pool, as
# signals can't be properly handled in worker processes, and they can crash
# causing the docbuild to hang. But where are these pthreads, you ask?
# Well, multiprocessing.Pool runs a thread from which it starts new worker
# processes when old workers complete/die, so the worker processes behave
# as though they were started from a pthread, even after fork(), and are
# actually succeptible to this bug. As a workaround, here's a naïve but
# good-enough "pool" replacement that does not use threads
# https://trac.sagemath.org/ticket/27214#comment:25 for further discussion.
def build_many(target, args):
from multiprocessing import Process
workers = [None] * NUM_THREADS
queue = list(args)
jobs = {}

try:
while True:
for idx, w in enumerate(workers):
if w and w.exitcode is not None:
if w.exitcode != 0:
raise RuntimeError(
"worker for {} died with non-zero exit code "
"{}".format(jobs[w.pid], w.exitcode))

jobs.pop(w.pid)
w = None

if w is None:
if queue:
job = queue.pop(0)
w = Process(target=target, args=(job,))
w.start()
jobs[w.pid] = job

workers[idx] = w

if not any(filter(None, workers)):
break

time.sleep(5)
finally:
for w in workers:
if w is not None:
w.terminate()
w.join()


##########################################
# Parallel Building Ref Manual #
Expand Down Expand Up @@ -318,7 +369,6 @@ def _wrapper(self, name, *args, **kwds):
This is the function which goes through all of the documents
and does the actual building.
"""
import time
start = time.time()
docs = self.get_all_documents()
refs = [x for x in docs if x.endswith('reference')]
Expand Down Expand Up @@ -811,7 +861,6 @@ def update_mtimes(self):
"""
env = self.get_sphinx_environment()
if env is not None:
import time
for doc in env.all_docs:
env.all_docs[doc] = time.time()
logger.info("Updated %d reST file mtimes", len(env.all_docs))
Expand Down

0 comments on commit f9aabb7

Please sign in to comment.