Skip to content

Commit

Permalink
pythongh-127933: Add option to run regression tests in parallel
Browse files Browse the repository at this point in the history
This adds a new command line argument, `--parallel-threads` to the
regression test runner to allow it to run individual tests in multiple
threads in parallel in order to find multithreading bugs.
  • Loading branch information
colesbury committed Dec 16, 2024
1 parent 0816738 commit c16bf01
Show file tree
Hide file tree
Showing 10 changed files with 139 additions and 2 deletions.
5 changes: 5 additions & 0 deletions Lib/test/libregrtest/cmdline.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ def __init__(self, **kwargs) -> None:
self.print_slow = False
self.random_seed = None
self.use_mp = None
self.parallel_threads = None
self.forever = False
self.header = False
self.failfast = False
Expand Down Expand Up @@ -316,6 +317,10 @@ def _create_parser():
'a single process, ignore -jN option, '
'and failed tests are also rerun sequentially '
'in the same process')
group.add_argument('--parallel-threads', metavar='PARALLEL_THREADS',
type=int,
help='run copies of each test in PARALLEL_THREADS at '
'once')
group.add_argument('-T', '--coverage', action='store_true',
dest='trace',
help='turn on code coverage tracing using the trace '
Expand Down
3 changes: 3 additions & 0 deletions Lib/test/libregrtest/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,8 @@ def __init__(self, ns: Namespace, _add_python_opts: bool = False):
else:
self.random_seed = ns.random_seed

self.parallel_threads = ns.parallel_threads

# tests
self.first_runtests: RunTests | None = None

Expand Down Expand Up @@ -506,6 +508,7 @@ def create_run_tests(self, tests: TestTuple) -> RunTests:
python_cmd=self.python_cmd,
randomize=self.randomize,
random_seed=self.random_seed,
parallel_threads=self.parallel_threads,
)

def _run_tests(self, selected: TestTuple, tests: TestList | None) -> int:
Expand Down
78 changes: 78 additions & 0 deletions Lib/test/libregrtest/parallel_case.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
"""Run a test case multiple times in parallel threads."""

import copy
import functools
import threading
import unittest

from unittest import TestCase


class ParallelTestCase(TestCase):
def __init__(self, test_case: TestCase, num_threads: int):
self.test_case = test_case
self.num_threads = num_threads
self._testMethodName = test_case._testMethodName
self._testMethodDoc = test_case._testMethodDoc

def __str__(self):
return f"{str(self.test_case)} [threads={self.num_threads}]"

def run_worker(self, test_case: TestCase, result: unittest.Result,
barrier: threading.Barrier):
barrier.wait()
test_case.run(result)

def run(self, result=None):
if result is None:
result = test_case.defaultTestResult()
startTestRun = getattr(result, 'startTestRun', None)
stopTestRun = getattr(result, 'stopTestRun', None)
if startTestRun is not None:
startTestRun()
else:
stopTestRun = None

# Called at the beginning of each test. See TestCase.run.
result.startTest(self)

cases = [copy.copy(self.test_case) for _ in range(self.num_threads)]
results = [unittest.TestResult() for _ in range(self.num_threads)]

barrier = threading.Barrier(self.num_threads)
threads = []
for case, r in zip(cases, results):
thread = threading.Thread(target=self.run_worker,
args=(case, r, barrier),
daemon=True)
threads.append(thread)

for thread in threads:
thread.start()

for threads in threads:
threads.join()

# Aggregate test results
if all(r.wasSuccessful() for r in results):
result.addSuccess(self)

# Note: We can't call result.addError, result.addFailure, etc. because
# we no longer the original exception, just the string format.
for r in results:
if len(r.errors) > 0 or len(r.failures) > 0:
result._mirrorOutput = True
result.errors.extend(r.errors)
result.failures.extend(r.failures)
result.skipped.extend(r.skipped)
result.expectedFailures.extend(r.expectedFailures)
result.unexpectedSuccesses.extend(r.unexpectedSuccesses)
result.collectedDurations.extend(r.collectedDurations)

if any(r.shouldStop for r in results):
result.stop()

# Test has finished running
result.stopTest(self)
if stopTestRun is not None:
stopTestRun()
3 changes: 3 additions & 0 deletions Lib/test/libregrtest/runtests.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ class RunTests:
python_cmd: tuple[str, ...] | None
randomize: bool
random_seed: int | str
parallel_threads: int | None

def copy(self, **override) -> 'RunTests':
state = dataclasses.asdict(self)
Expand Down Expand Up @@ -184,6 +185,8 @@ def bisect_cmd_args(self) -> list[str]:
args.extend(("--python", cmd))
if self.randomize:
args.append(f"--randomize")
if self.parallel_threads:
args.append(f"--parallel-threads={self.parallel_threads}")
args.append(f"--randseed={self.random_seed}")
return args

Expand Down
30 changes: 28 additions & 2 deletions Lib/test/libregrtest/single.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from .save_env import saved_test_environment
from .setup import setup_tests
from .testresult import get_test_runner
from .parallel_case import ParallelTestCase
from .utils import (
TestName,
clear_caches, remove_testfn, abs_module_name, print_warning)
Expand All @@ -27,14 +28,17 @@
PROGRESS_MIN_TIME = 30.0 # seconds


def run_unittest(test_mod):
def run_unittest(test_mod, runtests: RunTests):
loader = unittest.TestLoader()
tests = loader.loadTestsFromModule(test_mod)

for error in loader.errors:
print(error, file=sys.stderr)
if loader.errors:
raise Exception("errors while loading tests")
_filter_suite(tests, match_test)
if runtests.parallel_threads:
_parallelize_tests(tests, runtests.parallel_threads)
return _run_suite(tests)

def _filter_suite(suite, pred):
Expand All @@ -49,6 +53,28 @@ def _filter_suite(suite, pred):
newtests.append(test)
suite._tests = newtests

def _parallelize_tests(suite, parallel_threads: int):
def is_thread_unsafe(test):
test_method = getattr(test, test._testMethodName)
instance = test_method.__self__
return (getattr(test_method, "__unittest_thread_unsafe__", False) or
getattr(instance, "__unittest_thread_unsafe__", False))

newtests = []
for test in suite._tests:
if isinstance(test, unittest.TestSuite):
_parallelize_tests(test, parallel_threads)
newtests.append(test)
continue

if is_thread_unsafe(test):
# Don't parallelize thread-unsafe tests
newtests.append(test)
continue

newtests.append(ParallelTestCase(test, parallel_threads))
suite._tests = newtests

def _run_suite(suite):
"""Run tests from a unittest.TestSuite-derived class."""
runner = get_test_runner(sys.stdout,
Expand Down Expand Up @@ -133,7 +159,7 @@ def _load_run_test(result: TestResult, runtests: RunTests) -> None:
raise Exception(f"Module {test_name} defines test_main() which "
f"is no longer supported by regrtest")
def test_func():
return run_unittest(test_mod)
return run_unittest(test_mod, runtests)

try:
regrtest_runner(result, test_func, runtests)
Expand Down
15 changes: 15 additions & 0 deletions Lib/test/support/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,21 @@ def wrapper(*args, **kw):
return decorator


def thread_unsafe(reason):
"""Mark a test as not thread safe. When the test runner is run with
--parallel-threads=N, the test will be run in a single thread."""
def decorator(test_item):
test_item.__unittest_thread_unsafe__ = True
# the reason is not currently used
test_item.__unittest_thread_unsafe__why__ = reason
return test_item
if isinstance(reason, types.FunctionType):
test_item = reason
reason = ''
return decorator(test_item)
return decorator


def skip_if_buildbot(reason=None):
"""Decorator raising SkipTest if running on a buildbot."""
import getpass
Expand Down
2 changes: 2 additions & 0 deletions Lib/test/test_class.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"Test the functionality of Python classes implementing operators."

import unittest
from test import support
from test.support import cpython_only, import_helper, script_helper, skip_emscripten_stack_overflow

testmeths = [
Expand Down Expand Up @@ -134,6 +135,7 @@ def __%s__(self, *args):
AllTests = type("AllTests", (object,), d)
del d, statictests, method, method_template

@support.thread_unsafe("callLst is shared between threads")
class ClassTests(unittest.TestCase):
def setUp(self):
callLst[:] = []
Expand Down
3 changes: 3 additions & 0 deletions Lib/test/test_descr.py
Original file line number Diff line number Diff line change
Expand Up @@ -1109,6 +1109,7 @@ class MyFrozenSet(frozenset):
with self.assertRaises(TypeError):
frozenset().__class__ = MyFrozenSet

@support.thread_unsafe
def test_slots(self):
# Testing __slots__...
class C0(object):
Expand Down Expand Up @@ -5473,6 +5474,7 @@ def __repr__(self):
{pickle.dumps, pickle._dumps},
{pickle.loads, pickle._loads}))

@support.thread_unsafe
def test_pickle_slots(self):
# Tests pickling of classes with __slots__.

Expand Down Expand Up @@ -5540,6 +5542,7 @@ class E(C):
y = pickle_copier.copy(x)
self._assert_is_copy(x, y)

@support.thread_unsafe
def test_reduce_copying(self):
# Tests pickling and copying new-style classes and objects.
global C1
Expand Down
1 change: 1 addition & 0 deletions Lib/test/test_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -666,6 +666,7 @@ class COperatorTestCase(OperatorTestCase, unittest.TestCase):
module = c_operator


@support.thread_unsafe("swaps global operator module")
class OperatorPickleTestCase:
def copy(self, obj, proto):
with support.swap_item(sys.modules, 'operator', self.module):
Expand Down
1 change: 1 addition & 0 deletions Lib/test/test_tokenize.py
Original file line number Diff line number Diff line change
Expand Up @@ -1537,6 +1537,7 @@ def test_false_encoding(self):
self.assertEqual(encoding, 'utf-8')
self.assertEqual(consumed_lines, [b'print("#coding=fake")'])

@support.thread_unsafe
def test_open(self):
filename = os_helper.TESTFN + '.py'
self.addCleanup(os_helper.unlink, filename)
Expand Down

0 comments on commit c16bf01

Please sign in to comment.