Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

gh-123940: Ensure force-terminated daemon threads can be joined #124150

Open
wants to merge 16 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions Include/internal/pycore_interp.h
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,12 @@ struct _is {
or the size specified by the THREAD_STACK_SIZE macro. */
/* Used in Python/thread.c. */
size_t stacksize;

/* Linked lists of ThreadHandles for threads created by
the threading module.
*/
struct llist_node non_daemon_handles;
struct llist_node daemon_handles;
} threads;

/* Reference to the _PyRuntime global variable. This field exists
Expand Down
2 changes: 2 additions & 0 deletions Include/internal/pycore_pythread.h
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,8 @@ extern int _PyThread_at_fork_reinit(PyThread_type_lock *lock);
extern void _PyThread_AfterFork(struct _pythread_runtime_state *state);
#endif /* HAVE_FORK */

extern void _PyThread_DaemonThreadsForceKilled(PyInterpreterState *interp);
extern void _PyThread_ClearThreadHandles(PyInterpreterState *interp);

// unset: -1 seconds, in nanoseconds
#define PyThread_UNSET_TIMEOUT ((PyTime_t)(-1 * 1000 * 1000 * 1000))
Expand Down
35 changes: 35 additions & 0 deletions Lib/test/test_threading.py
Original file line number Diff line number Diff line change
Expand Up @@ -1171,6 +1171,41 @@ def __del__(self):
self.assertEqual(out.strip(), b"OK")
self.assertIn(b"can't create new thread at interpreter shutdown", err)

@unittest.skipIf(support.Py_GIL_DISABLED, "gh-124149: daemon threads don't force exit")
def test_join_force_terminated_daemon_thread_in_finalization(self):
# gh-123940: Py_Finalize() forces all daemon threads to exit
# immediately (without unwinding the stack) upon acquiring the
# GIL. Finalizers that run after this must be able to join the daemon
# threads that were forced to exit.
code = textwrap.dedent("""
import threading


def loop():
while True:
pass
Comment on lines +1184 to +1186
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A slight variant where loop() calls time.sleep(1) hard crashes in 3.11 and 3.12. I'm not sure what to make of that, other than this sort of behavior wasn't robust previously either. I think it's noteworthy that the change that led to the issue was from just a few days ago -- it doesn't look like it was some longstanding code that just broke now.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

possibly #87135 related.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the issue #116514 colesbury linked when I asked (I wasn't able to repro just the time.sleep variant mentioned here)



class Cycle:
def __init__(self):
self.self_ref = self
self.thr = threading.Thread(target=loop, daemon=True)
self.thr.start()

def __del__(self):
self.thr.join()
print('__del__ called')

# Cycle holds a reference to itself, which ensures it is cleaned
# up during the GC that runs after daemon threads have been
# forced to exit during finalization.
Cycle()
""")
rc, out, err = assert_python_ok("-c", code)
self.assertEqual(err, b"")
self.assertIn(b"__del__ called", out)


class ThreadJoinOnShutdown(BaseTestCase):

def _run_and_join(self, script):
Expand Down
92 changes: 57 additions & 35 deletions Modules/_threadmodule.c
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,6 @@ typedef struct {
PyTypeObject *local_type;
PyTypeObject *local_dummy_type;
PyTypeObject *thread_handle_type;

// Linked list of handles to all non-daemon threads created by the
// threading module. We wait for these to finish at shutdown.
struct llist_node shutdown_handles;
} thread_module_state;

static inline thread_module_state*
Expand Down Expand Up @@ -78,7 +74,8 @@ typedef enum {
typedef struct {
struct llist_node node; // linked list node (see _pythread_runtime_state)

// linked list node (see thread_module_state)
// belongs to either `non_daemon_handles` or `daemon_handles` on
// PyInterpreterState
struct llist_node shutdown_node;

// The `ident`, `os_handle`, `has_os_handle`, and `state` fields are
Expand Down Expand Up @@ -143,19 +140,26 @@ ThreadHandle_get_os_handle(ThreadHandle *handle, PyThread_handle_t *os_handle)
}

static void
add_to_shutdown_handles(thread_module_state *state, ThreadHandle *handle)
add_to_shutdown_handles(struct llist_node *shutdown_handles,
ThreadHandle *handle)
{
HEAD_LOCK(&_PyRuntime);
llist_insert_tail(&state->shutdown_handles, &handle->shutdown_node);
llist_insert_tail(shutdown_handles, &handle->shutdown_node);
HEAD_UNLOCK(&_PyRuntime);
}

static void
clear_shutdown_handles(thread_module_state *state)
// Remove any remaining handles (e.g. if shutdown exited early due to
// interrupt) so that attempts to unlink the handle after our module state
// is destroyed do not crash.
void
_PyThread_ClearThreadHandles(PyInterpreterState *interp)
{
HEAD_LOCK(&_PyRuntime);
struct llist_node *node;
llist_for_each_safe(node, &state->shutdown_handles) {
llist_for_each_safe(node, &interp->threads.daemon_handles) {
llist_remove(node);
}
llist_for_each_safe(node, &interp->threads.non_daemon_handles) {
llist_remove(node);
}
HEAD_UNLOCK(&_PyRuntime);
Expand Down Expand Up @@ -378,7 +382,7 @@ force_done(ThreadHandle *handle)

static int
ThreadHandle_start(ThreadHandle *self, PyObject *func, PyObject *args,
PyObject *kwargs)
PyObject *kwargs, struct llist_node *shutdown_handles)
{
// Mark the handle as starting to prevent any other threads from doing so
PyMutex_Lock(&self->mutex);
Expand All @@ -390,6 +394,11 @@ ThreadHandle_start(ThreadHandle *self, PyObject *func, PyObject *args,
self->state = THREAD_HANDLE_STARTING;
PyMutex_Unlock(&self->mutex);

// Add the handle before starting the thread to avoid adding a handle
// to a thread that has already finished (i.e. if the thread finishes
// before the call to `ThreadHandle_start()` below returns).
add_to_shutdown_handles(shutdown_handles, self);

// Do all the heavy lifting outside of the mutex. All other operations on
// the handle should fail since the handle is in the starting state.

Expand Down Expand Up @@ -441,6 +450,7 @@ ThreadHandle_start(ThreadHandle *self, PyObject *func, PyObject *args,
return 0;

start_failed:
remove_from_shutdown_handles(self);
_PyOnceFlag_CallOnce(&self->once, (_Py_once_fn_t *)force_done, self);
return -1;
}
Expand Down Expand Up @@ -1857,8 +1867,8 @@ Return True if daemon threads are allowed in the current interpreter,\n\
and False otherwise.\n");

static int
do_start_new_thread(thread_module_state *state, PyObject *func, PyObject *args,
PyObject *kwargs, ThreadHandle *handle, int daemon)
do_start_new_thread(PyObject *func, PyObject *args, PyObject *kwargs,
ThreadHandle *handle, int daemon)
{
PyInterpreterState *interp = _PyInterpreterState_GET();
if (!_PyInterpreterState_HasFeature(interp, Py_RTFLAGS_THREADS)) {
Expand All @@ -1872,17 +1882,12 @@ do_start_new_thread(thread_module_state *state, PyObject *func, PyObject *args,
return -1;
}

if (!daemon) {
// Add the handle before starting the thread to avoid adding a handle
// to a thread that has already finished (i.e. if the thread finishes
// before the call to `ThreadHandle_start()` below returns).
add_to_shutdown_handles(state, handle);
struct llist_node *shutdown_handles = &interp->threads.non_daemon_handles;
if (daemon) {
shutdown_handles = &interp->threads.daemon_handles;
}

if (ThreadHandle_start(handle, func, args, kwargs) < 0) {
if (!daemon) {
remove_from_shutdown_handles(handle);
}
if (ThreadHandle_start(handle, func, args, kwargs, shutdown_handles) < 0) {
return -1;
}

Expand All @@ -1893,7 +1898,6 @@ static PyObject *
thread_PyThread_start_new_thread(PyObject *module, PyObject *fargs)
{
PyObject *func, *args, *kwargs = NULL;
thread_module_state *state = get_thread_state(module);

if (!PyArg_UnpackTuple(fargs, "start_new_thread", 2, 3,
&func, &args, &kwargs))
Expand Down Expand Up @@ -1924,8 +1928,7 @@ thread_PyThread_start_new_thread(PyObject *module, PyObject *fargs)
return NULL;
}

int st =
do_start_new_thread(state, func, args, kwargs, handle, /*daemon=*/1);
int st = do_start_new_thread(func, args, kwargs, handle, /*daemon=*/1);
if (st < 0) {
ThreadHandle_decref(handle);
return NULL;
Expand Down Expand Up @@ -2002,8 +2005,9 @@ thread_PyThread_start_joinable_thread(PyObject *module, PyObject *fargs,
if (args == NULL) {
return NULL;
}
int st = do_start_new_thread(state, func, args,
/*kwargs=*/ NULL, ((PyThreadHandleObject*)hobj)->handle, daemon);
int st = do_start_new_thread(
func, args,
/*kwargs=*/NULL, ((PyThreadHandleObject *)hobj)->handle, daemon);
Py_DECREF(args);
if (st < 0) {
Py_DECREF(hobj);
Expand Down Expand Up @@ -2364,15 +2368,15 @@ static PyObject *
thread_shutdown(PyObject *self, PyObject *args)
{
PyThread_ident_t ident = PyThread_get_thread_ident_ex();
thread_module_state *state = get_thread_state(self);
PyInterpreterState *interp = _PyInterpreterState_GET();

for (;;) {
ThreadHandle *handle = NULL;

// Find a thread that's not yet finished.
HEAD_LOCK(&_PyRuntime);
struct llist_node *node;
llist_for_each_safe(node, &state->shutdown_handles) {
llist_for_each_safe(node, &interp->threads.non_daemon_handles) {
ThreadHandle *cur = llist_data(node, ThreadHandle, shutdown_node);
if (cur->ident != ident) {
ThreadHandle_incref(cur);
Expand Down Expand Up @@ -2407,6 +2411,30 @@ PyDoc_STRVAR(shutdown_doc,
\n\
Wait for all non-daemon threads (other than the calling thread) to stop.");

/* gh-123940: Mark remaining daemon threads as exited so that they may
* be joined from finalizers.
*/
void
_PyThread_DaemonThreadsForceKilled(PyInterpreterState *interp)
{
for (;;) {
HEAD_LOCK(&_PyRuntime);
if (llist_empty(&interp->threads.daemon_handles)) {
HEAD_UNLOCK(&_PyRuntime);
break;
}
struct llist_node *node = interp->threads.daemon_handles.next;
ThreadHandle *handle = llist_data(node, ThreadHandle, shutdown_node);
ThreadHandle_incref(handle);
llist_remove(node);
HEAD_UNLOCK(&_PyRuntime);
// gh-123940: Mark daemon threads as done so that they can be joined
// from finalizers.
_PyEvent_Notify(&handle->thread_is_exiting);
ThreadHandle_decref(handle);
}
}

static PyObject *
thread__make_thread_handle(PyObject *module, PyObject *identobj)
{
Expand Down Expand Up @@ -2579,8 +2607,6 @@ thread_module_exec(PyObject *module)
return -1;
}

llist_init(&state->shutdown_handles);

return 0;
}

Expand All @@ -2606,10 +2632,6 @@ thread_module_clear(PyObject *module)
Py_CLEAR(state->local_type);
Py_CLEAR(state->local_dummy_type);
Py_CLEAR(state->thread_handle_type);
// Remove any remaining handles (e.g. if shutdown exited early due to
// interrupt) so that attempts to unlink the handle after our module state
// is destroyed do not crash.
clear_shutdown_handles(state);
return 0;
}

Expand Down
1 change: 1 addition & 0 deletions PCbuild/_freeze_module.vcxproj
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@
<ClCompile Include="..\Modules\getpath_noop.c" />
<ClCompile Include="..\Modules\posixmodule.c" />
<ClCompile Include="..\Modules\signalmodule.c" />
<ClCompile Include="..\Modules\_threadmodule.c" />
<ClCompile Include="..\Modules\timemodule.c" />
<ClCompile Include="..\Modules\_tracemalloc.c" />
<ClCompile Include="..\Modules\_io\_iomodule.c" />
Expand Down
3 changes: 3 additions & 0 deletions PCbuild/_freeze_module.vcxproj.filters
Original file line number Diff line number Diff line change
Expand Up @@ -431,6 +431,9 @@
<ClCompile Include="..\Python\thread.c">
<Filter>Source Files</Filter>
</ClCompile>
<ClCompile Include="..\Modules\_threadmodule.c">
<Filter>Source Files</Filter>
</ClCompile>
<ClCompile Include="..\Modules\timemodule.c">
<Filter>Source Files</Filter>
</ClCompile>
Expand Down
1 change: 1 addition & 0 deletions Python/pylifecycle.c
Original file line number Diff line number Diff line change
Expand Up @@ -2040,6 +2040,7 @@ _Py_Finalize(_PyRuntimeState *runtime)
before we call destructors. */
PyThreadState *list = _PyThreadState_RemoveExcept(tstate);
_PyEval_StartTheWorldAll(runtime);
_PyThread_DaemonThreadsForceKilled(tstate->interp);
_PyThreadState_DeleteList(list);

/* At this point no Python code should be running at all.
Expand Down
4 changes: 3 additions & 1 deletion Python/pystate.c
Original file line number Diff line number Diff line change
Expand Up @@ -665,6 +665,8 @@ init_interpreter(PyInterpreterState *interp,
/* Fix the self-referential, statically initialized fields. */
interp->dtoa = (struct _dtoa_state)_dtoa_state_INIT(interp);
}
llist_init(&interp->threads.daemon_handles);
llist_init(&interp->threads.non_daemon_handles);
Comment on lines +668 to +669
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would make sense to have a _PyThread_InitThreadHandles() for this, to match _PyThread_ClearThreadHandles().


interp->_initialized = 1;
return _PyStatus_OK();
Expand Down Expand Up @@ -811,7 +813,7 @@ interpreter_clear(PyInterpreterState *interp, PyThreadState *tstate)
// XXX Eliminate the need to do this.
tstate->_status.cleared = 0;
}

_PyThread_ClearThreadHandles(interp);
#ifdef _Py_TIER2
_PyOptimizerObject *old = _Py_SetOptimizer(interp, NULL);
assert(old != NULL);
Expand Down
5 changes: 5 additions & 0 deletions Tools/tsan/suppressions_free_threading.txt
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,11 @@
race:get_allocator_unlocked
race:set_allocator_unlocked

# Triggered by test_threading.ThreadTests.test_join_force_terminated_daemon_thread_in_finalization
# https://gist.github.com/mpage/ff1c7094bc8237d98387678d5f52058f
race_top:_PyThreadState_MustExit


## Free-threaded suppressions


Expand Down
4 changes: 4 additions & 0 deletions Tools/tsan/supressions.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,9 @@
race:get_allocator_unlocked
race:set_allocator_unlocked

# Triggered by test_threading.ThreadTests.test_join_force_terminated_daemon_thread_in_finalization
# https://gist.github.com/mpage/ff1c7094bc8237d98387678d5f52058f
race_top:_PyThreadState_MustExit

# https://gist.github.com/mpage/daaf32b39180c1989572957b943eb665
thread:pthread_create
Loading