diff --git a/Lib/test/test_threading.py b/Lib/test/test_threading.py index 97165264b34bbe..222e096f57e3e0 100644 --- a/Lib/test/test_threading.py +++ b/Lib/test/test_threading.py @@ -345,7 +345,7 @@ def run(self): def test_limbo_cleanup(self): # Issue 7481: Failure to start thread should cleanup the limbo map. - def fail_new_thread(*args): + def fail_new_thread(*args, **kwargs): raise threading.ThreadError() _start_new_thread = threading._start_new_thread threading._start_new_thread = fail_new_thread diff --git a/Lib/threading.py b/Lib/threading.py index df273870fa4273..7ea828c64b80da 100644 --- a/Lib/threading.py +++ b/Lib/threading.py @@ -49,6 +49,11 @@ except AttributeError: _CRLock = None TIMEOUT_MAX = _thread.TIMEOUT_MAX +_wait_for_threads_fini = _thread._wait_for_threads_fini +try: + _internal_after_fork = _thread._after_fork +except AttributeError: + _internal_after_fork = None del _thread @@ -968,7 +973,7 @@ def start(self): with _active_limbo_lock: _limbo[self] = self try: - _start_new_thread(self._bootstrap, ()) + _start_new_thread(self._bootstrap, (), daemonic=self._daemonic) except Exception: with _active_limbo_lock: del _limbo[self] @@ -1589,6 +1594,7 @@ def _shutdown(): pass # Join all non-deamon threads + # XXX We should be able to drop this in favor of _wait_for_threads_fini(). while True: with _shutdown_locks_lock: locks = list(_shutdown_locks) @@ -1605,6 +1611,9 @@ def _shutdown(): # new threads can be spawned while we were waiting for the other # threads to complete + # Wait for all non-daemon threads to be finalized. + _wait_for_threads_fini() + def main_thread(): """Return the main thread object. @@ -1677,4 +1686,6 @@ def _after_fork(): if hasattr(_os, "register_at_fork"): + if _internal_after_fork is not None: + _os.register_at_fork(after_in_child=_internal_after_fork) _os.register_at_fork(after_in_child=_after_fork) diff --git a/Modules/_threadmodule.c b/Modules/_threadmodule.c index 5d753b4a0ebc5e..dd085d7d5aa5cb 100644 --- a/Modules/_threadmodule.c +++ b/Modules/_threadmodule.c @@ -22,7 +22,260 @@ static struct PyModuleDef thread_module; +/* threads owned by the module */ + +struct module_thread { + PyThreadState *tstate; + int daemonic; + int running; + PyThread_type_lock lifetime_mutex; + int lifetime_mutex_held; + struct module_thread *prev; + struct module_thread *next; +}; + +struct module_threads { + PyThread_type_lock mutex; + struct module_thread *head; + struct module_thread *tail; +}; + +static int +module_threads_init(struct module_threads *threads) +{ + threads->head = NULL; + threads->tail = NULL; + threads->mutex = PyThread_allocate_lock(); + if (threads->mutex == NULL) { + PyErr_NoMemory(); + return -1; + } + return 0; +} + +#ifdef HAVE_FORK +static int module_thread_reinit(struct module_thread *); + +static int +module_threads_reinit(struct module_threads *threads) +{ + if (_PyThread_at_fork_reinit(threads->mutex) < 0) { + PyErr_SetString(ThreadError, "failed to reinitialize lock at fork"); + return -1; + } + + PyThread_acquire_lock(threads->mutex, WAIT_LOCK); + + struct module_thread *mt = threads->head; + while (mt != NULL) { + if (module_thread_reinit(mt) < 0) { + return -1; + } + mt = mt->next; + } + + PyThread_release_lock(threads->mutex); + + return 0; +} +#endif + +static void +module_threads_fini(struct module_threads *threads) +{ + // Wait for all the threads to finalize. + int done = 0; + while (!done) { + done = 1; + PyThread_acquire_lock(threads->mutex, WAIT_LOCK); + struct module_thread *mt = threads->head; + while (mt != NULL) { + if (mt->running) { + assert(mt->daemonic); + // It was killed with PyThread_exit_thread(). + } + else { + done = 0; + break; + } + mt = mt->next; + } + PyThread_release_lock(threads->mutex); + } + + PyThread_free_lock(threads->mutex); +} + +static void +module_threads_add(struct module_threads *threads, struct module_thread *mt) +{ + PyThread_acquire_lock(threads->mutex, WAIT_LOCK); + + // Add it to the end of the list. + if (threads->head == NULL) { + threads->head = mt; + } + else { + mt->prev = threads->tail; + threads->tail->next = mt; + } + threads->tail = mt; + + PyThread_release_lock(threads->mutex); +} + +static void +module_threads_remove(struct module_threads *threads, struct module_thread *mt) +{ + PyThread_acquire_lock(threads->mutex, WAIT_LOCK); + + if (mt->prev == NULL) { + threads->head = mt->next; + } + else { + mt->prev->next = mt->next; + } + if (mt->next == NULL) { + threads->tail = mt->prev; + } + else { + mt->next->prev = mt->prev; + } + + PyThread_release_lock(threads->mutex); +} + +static struct module_thread * +add_module_thread(struct module_threads *threads, + PyThreadState *tstate, int daemonic) +{ + // Create the new list entry. + struct module_thread *mt = PyMem_RawMalloc(sizeof(struct module_thread)); + if (mt == NULL) { + if (!PyErr_Occurred()) { + PyErr_NoMemory(); + } + return NULL; + } + mt->tstate = tstate; + mt->daemonic = daemonic; + mt->running = 0; + mt->prev = NULL; + mt->next = NULL; + + // Create the lifetime lock. + mt->lifetime_mutex = PyThread_allocate_lock(); + if (mt->lifetime_mutex == NULL) { + PyMem_Free(mt); + return NULL; + } + mt->lifetime_mutex_held = 0; + + // Add the entry to the end of the list. + module_threads_add(threads, mt); + + return mt; +} + +#ifdef HAVE_FORK +static int +module_thread_reinit(struct module_thread *mt) +{ + if (_PyThread_at_fork_reinit(mt->lifetime_mutex) < 0) { + PyErr_SetString(ThreadError, "failed to reinitialize lock at fork"); + return -1; + } + if (mt->lifetime_mutex_held) { + PyThread_acquire_lock(mt->lifetime_mutex, WAIT_LOCK); + } + + return 0; +} +#endif + +static void +module_thread_starting(struct module_thread *mt) +{ + assert(mt->tstate == PyThreadState_Get()); + + mt->tstate->interp->threads.count++; + + // We acquire the lifetime lock here instead of in add_module_thread() + // because we must do it in the actual thread, which wasn't started yet + // when add_module_thread() was called. + PyThread_acquire_lock(mt->lifetime_mutex, WAIT_LOCK); + mt->lifetime_mutex_held = 1; + + mt->running = 1; +} + +static void +module_thread_finished(struct module_thread *mt) +{ + mt->tstate->interp->threads.count--; + + mt->running = 0; + + // Notify other threads that this one is done. + // XXX Do it explicitly here rather than via tstate.on_delete(). +} + +static void +remove_module_thread(struct module_threads *threads, struct module_thread *mt) +{ + // Mark the thread as truly dead now. + if (mt->lifetime_mutex_held) { + PyThread_release_lock(mt->lifetime_mutex); + } + + // Remove it from the list. + module_threads_remove(threads, mt); + + // Deallocate everything. + if (mt->lifetime_mutex != NULL) { + PyThread_free_lock(mt->lifetime_mutex); + mt->lifetime_mutex = NULL; + } + PyMem_RawFree(mt); +} + +static void +wait_for_threads_fini(struct module_threads *threads) +{ + Py_BEGIN_ALLOW_THREADS + + int done = 0; + while (!done) { + PyThread_acquire_lock(threads->mutex, WAIT_LOCK); + + done = 1; + struct module_thread *mt = threads->head; + while (mt != NULL) { + if (!mt->daemonic) { + // Wait for the next thread to be finalized. + if (PyThread_acquire_lock(mt->lifetime_mutex, NOWAIT_LOCK)) { + PyThread_release_lock(mt->lifetime_mutex); + } + else { + done = 0; + break; + } + } + mt = mt->next; + } + + PyThread_release_lock(threads->mutex); + } + + Py_END_ALLOW_THREADS +} + + +/* module state */ + typedef struct { + struct module_threads threads; + PyTypeObject *excepthook_type; PyTypeObject *lock_type; PyTypeObject *local_type; @@ -1054,6 +1307,8 @@ struct bootstate { PyObject *kwargs; PyThreadState *tstate; _PyRuntimeState *runtime; + thread_module_state *module_state; + struct module_thread *module_thread; }; @@ -1071,12 +1326,14 @@ static void thread_run(void *boot_raw) { struct bootstate *boot = (struct bootstate *) boot_raw; - PyThreadState *tstate; + PyThreadState *tstate = boot->tstate; + thread_module_state *state = boot->module_state; + struct module_thread *mt = boot->module_thread; - tstate = boot->tstate; _PyThreadState_Bind(tstate); PyEval_AcquireThread(tstate); - tstate->interp->threads.count++; + + module_thread_starting(mt); PyObject *res = PyObject_Call(boot->func, boot->args, boot->kwargs); if (res == NULL) { @@ -1091,10 +1348,12 @@ thread_run(void *boot_raw) Py_DECREF(res); } + module_thread_finished(mt); + thread_bootstate_free(boot); - tstate->interp->threads.count--; PyThreadState_Clear(tstate); _PyThreadState_DeleteCurrent(tstate); + remove_module_thread(&state->threads, mt); // bpo-44434: Don't call explicitly PyThread_exit_thread(). On Linux with // the glibc, pthread_exit() can abort the whole process if dlopen() fails @@ -1120,14 +1379,19 @@ Return True if daemon threads are allowed in the current interpreter,\n\ and False otherwise.\n"); static PyObject * -thread_PyThread_start_new_thread(PyObject *self, PyObject *fargs) +thread_PyThread_start_new_thread(PyObject *self, PyObject *fargs, PyObject *fkwargs) { _PyRuntimeState *runtime = &_PyRuntime; - PyObject *func, *args, *kwargs = NULL; - if (!PyArg_UnpackTuple(fargs, "start_new_thread", 2, 3, - &func, &args, &kwargs)) + char *kwlist[] = {"", "", "", "daemonic", NULL}; + PyObject *func, *args, *kwargs = NULL; + int daemonic = 0; + if (!PyArg_ParseTupleAndKeywords(fargs, fkwargs, + "OO|Op:start_new_thread", kwlist, + &func, &args, &kwargs, &daemonic)) + { return NULL; + } if (!PyCallable_Check(func)) { PyErr_SetString(PyExc_TypeError, "first arg must be callable"); @@ -1169,6 +1433,15 @@ thread_PyThread_start_new_thread(PyObject *self, PyObject *fargs) } return NULL; } + thread_module_state *state = get_thread_state(self); + boot->module_state = state; + boot->module_thread = add_module_thread( + &state->threads, boot->tstate, daemonic); + if (boot->module_thread == NULL) { + PyThreadState_Clear(boot->tstate); + PyMem_Free(boot); + return NULL; + } boot->runtime = runtime; boot->func = Py_NewRef(func); boot->args = Py_NewRef(args); @@ -1185,7 +1458,7 @@ thread_PyThread_start_new_thread(PyObject *self, PyObject *fargs) } PyDoc_STRVAR(start_new_doc, -"start_new_thread(function, args[, kwargs])\n\ +"start_new_thread(function, args[, kwargs], daemonic=0)\n\ (start_new() is an obsolete synonym)\n\ \n\ Start a new thread and return its identifier. The thread will call the\n\ @@ -1563,11 +1836,31 @@ PyDoc_STRVAR(excepthook_doc, \n\ Handle uncaught Thread.run() exception."); +static PyObject * +thread__wait_for_threads_fini(PyObject *module, PyObject *Py_UNUSED(ignored)) +{ + thread_module_state *state = get_thread_state(module); + wait_for_threads_fini(&state->threads); + Py_RETURN_NONE; +} + +#ifdef HAVE_FORK +static PyObject * +thread__after_fork(PyObject *module, PyObject *Py_UNUSED(ignored)) +{ + thread_module_state *state = get_thread_state(module); + if (module_threads_reinit(&state->threads) < 0) { + return NULL; + } + Py_RETURN_NONE; +} +#endif + static PyMethodDef thread_methods[] = { {"start_new_thread", (PyCFunction)thread_PyThread_start_new_thread, - METH_VARARGS, start_new_doc}, + METH_VARARGS | METH_KEYWORDS, start_new_doc}, {"start_new", (PyCFunction)thread_PyThread_start_new_thread, - METH_VARARGS, start_new_doc}, + METH_VARARGS | METH_KEYWORDS, start_new_doc}, {"daemon_threads_allowed", (PyCFunction)thread_daemon_threads_allowed, METH_NOARGS, daemon_threads_allowed_doc}, {"allocate_lock", thread_PyThread_allocate_lock, @@ -1592,8 +1885,14 @@ static PyMethodDef thread_methods[] = { METH_VARARGS, stack_size_doc}, {"_set_sentinel", thread__set_sentinel, METH_NOARGS, _set_sentinel_doc}, - {"_excepthook", thread_excepthook, + {"_excepthook", thread_excepthook, METH_O, excepthook_doc}, + {"_wait_for_threads_fini", (PyCFunction)thread__wait_for_threads_fini, + METH_NOARGS, NULL}, +#ifdef HAVE_FORK + {"_after_fork", (PyCFunction)thread__after_fork, + METH_NOARGS, NULL}, +#endif {NULL, NULL} /* sentinel */ }; @@ -1609,6 +1908,11 @@ thread_module_exec(PyObject *module) // Initialize the C thread library PyThread_init_thread(); + // Initialize the list of threads owned by this module. + if (module_threads_init(&state->threads) < 0) { + return -1; + } + // Lock state->lock_type = (PyTypeObject *)PyType_FromSpec(&lock_type_spec); if (state->lock_type == NULL) { @@ -1699,6 +2003,8 @@ thread_module_clear(PyObject *module) static void thread_module_free(void *module) { + thread_module_state *state = get_thread_state(module); + module_threads_fini(&state->threads); thread_module_clear((PyObject *)module); }