From 2efa0560747d92c7ba86de1ce562b67b56105300 Mon Sep 17 00:00:00 2001 From: Sam Gross Date: Mon, 14 Oct 2024 17:26:55 +0000 Subject: [PATCH 1/4] gh-125451: Fix deadlock in ProcessPoolExecutor shutdown There was a deadlock when `ProcessPoolExecutor` shuts down at the same time that a queueing thread handles an error when processing a task. Don't use `_shutdown_lock` to protect the `_ThreadWakeup` pipes -- use an internal lock instead. This fixes the ordering deadlock where the `ExecutorManagerThread` holds the `_shutdown_lock` and joins the queueing thread, while the queueing thread is attempting to acquire the `_shutdown_lock` while closing the `_ThreadWakeup`. --- Lib/concurrent/futures/process.py | 42 ++++++++----------- ...-10-14-17-29-34.gh-issue-125451.fmP3T9.rst | 2 + 2 files changed, 19 insertions(+), 25 deletions(-) create mode 100644 Misc/NEWS.d/next/Library/2024-10-14-17-29-34.gh-issue-125451.fmP3T9.rst diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py index 7092b4757b5429..f37ff48e282781 100644 --- a/Lib/concurrent/futures/process.py +++ b/Lib/concurrent/futures/process.py @@ -68,27 +68,30 @@ class _ThreadWakeup: def __init__(self): self._closed = False + self._lock = threading.Lock() self._reader, self._writer = mp.Pipe(duplex=False) def close(self): - # Please note that we do not take the shutdown lock when + # Please note that we do not take the self._lock when # calling clear() (to avoid deadlocking) so this method can # only be called safely from the same thread as all calls to - # clear() even if you hold the shutdown lock. Otherwise we + # clear() even if you hold the lock. Otherwise we # might try to read from the closed pipe. - if not self._closed: - self._closed = True - self._writer.close() - self._reader.close() + with self._lock: + if not self._closed: + self._closed = True + self._writer.close() + self._reader.close() def wakeup(self): - if not self._closed: - self._writer.send_bytes(b"") + with self._lock: + if not self._closed: + self._writer.send_bytes(b"") def clear(self): - if not self._closed: - while self._reader.poll(): - self._reader.recv_bytes() + assert not self._closed + while self._reader.poll(): + self._reader.recv_bytes() def _python_exit(): @@ -167,10 +170,8 @@ def __init__(self, work_id, fn, args, kwargs): class _SafeQueue(Queue): """Safe Queue set exception to the future object linked to a job""" - def __init__(self, max_size=0, *, ctx, pending_work_items, shutdown_lock, - thread_wakeup): + def __init__(self, max_size=0, *, ctx, pending_work_items, thread_wakeup): self.pending_work_items = pending_work_items - self.shutdown_lock = shutdown_lock self.thread_wakeup = thread_wakeup super().__init__(max_size, ctx=ctx) @@ -179,8 +180,7 @@ def _on_queue_feeder_error(self, e, obj): tb = format_exception(type(e), e, e.__traceback__) e.__cause__ = _RemoteTraceback('\n"""\n{}"""'.format(''.join(tb))) work_item = self.pending_work_items.pop(obj.work_id, None) - with self.shutdown_lock: - self.thread_wakeup.wakeup() + self.thread_wakeup.wakeup() # work_item can be None if another process terminated. In this # case, the executor_manager_thread fails all work_items # with BrokenProcessPool @@ -296,12 +296,10 @@ def __init__(self, executor): # if there is no pending work item. def weakref_cb(_, thread_wakeup=self.thread_wakeup, - shutdown_lock=self.shutdown_lock, mp_util_debug=mp.util.debug): mp_util_debug('Executor collected: triggering callback for' ' QueueManager wakeup') - with shutdown_lock: - thread_wakeup.wakeup() + thread_wakeup.wakeup() self.executor_reference = weakref.ref(executor, weakref_cb) @@ -429,11 +427,6 @@ def wait_result_broken_or_wakeup(self): elif wakeup_reader in ready: is_broken = False - # No need to hold the _shutdown_lock here because: - # 1. we're the only thread to use the wakeup reader - # 2. we're also the only thread to call thread_wakeup.close() - # 3. we want to avoid a possible deadlock when both reader and writer - # would block (gh-105829) self.thread_wakeup.clear() return result_item, is_broken, cause @@ -735,7 +728,6 @@ def __init__(self, max_workers=None, mp_context=None, self._call_queue = _SafeQueue( max_size=queue_size, ctx=self._mp_context, pending_work_items=self._pending_work_items, - shutdown_lock=self._shutdown_lock, thread_wakeup=self._executor_manager_thread_wakeup) # Killed worker processes can produce spurious "broken pipe" # tracebacks in the queue's own worker thread. But we detect killed diff --git a/Misc/NEWS.d/next/Library/2024-10-14-17-29-34.gh-issue-125451.fmP3T9.rst b/Misc/NEWS.d/next/Library/2024-10-14-17-29-34.gh-issue-125451.fmP3T9.rst new file mode 100644 index 00000000000000..6047906d7b98a0 --- /dev/null +++ b/Misc/NEWS.d/next/Library/2024-10-14-17-29-34.gh-issue-125451.fmP3T9.rst @@ -0,0 +1,2 @@ +Fix deadlock when :class:`ProcessPoolExecutor` shuts down concurrently with +an error when feeding a job to a worker process. From 2a30e4f74e6c44aecd6f88f29f8d0e8be09c3221 Mon Sep 17 00:00:00 2001 From: Sam Gross Date: Tue, 15 Oct 2024 15:13:37 +0000 Subject: [PATCH 2/4] Fix blurb --- .../Library/2024-10-14-17-29-34.gh-issue-125451.fmP3T9.rst | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Misc/NEWS.d/next/Library/2024-10-14-17-29-34.gh-issue-125451.fmP3T9.rst b/Misc/NEWS.d/next/Library/2024-10-14-17-29-34.gh-issue-125451.fmP3T9.rst index 6047906d7b98a0..589988d4d6273f 100644 --- a/Misc/NEWS.d/next/Library/2024-10-14-17-29-34.gh-issue-125451.fmP3T9.rst +++ b/Misc/NEWS.d/next/Library/2024-10-14-17-29-34.gh-issue-125451.fmP3T9.rst @@ -1,2 +1,2 @@ -Fix deadlock when :class:`ProcessPoolExecutor` shuts down concurrently with -an error when feeding a job to a worker process. +Fix deadlock when :class:`concurrent.futures.ProcessPoolExecutor` shuts down +concurrently with an error when feeding a job to a worker process. From b1f599bde3c655b8d21c587ce5e347b6881df458 Mon Sep 17 00:00:00 2001 From: Sam Gross Date: Tue, 15 Oct 2024 16:42:21 +0000 Subject: [PATCH 3/4] Changes from review --- Lib/concurrent/futures/process.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py index f37ff48e282781..42eee72bc1457f 100644 --- a/Lib/concurrent/futures/process.py +++ b/Lib/concurrent/futures/process.py @@ -89,7 +89,8 @@ def wakeup(self): self._writer.send_bytes(b"") def clear(self): - assert not self._closed + if self._closed: + raise RuntimeError('operation on closed _ThreadWakeup') while self._reader.poll(): self._reader.recv_bytes() @@ -714,10 +715,9 @@ def __init__(self, max_workers=None, mp_context=None, # as it could result in a deadlock if a worker process dies with the # _result_queue write lock still acquired. # - # _shutdown_lock must be locked to access _ThreadWakeup.close() and - # .wakeup(). Care must also be taken to not call clear or close from - # more than one thread since _ThreadWakeup.clear() is not protected by - # the _shutdown_lock + # Care must be taken to only call clear and close from the + # executor_manager_thread, since _ThreadWakeup.clear() is not protected + # by a lock. self._executor_manager_thread_wakeup = _ThreadWakeup() # Create communication channels for the executor From 84db20196d7cd949fc5a150b6e4bf7df73dd1810 Mon Sep 17 00:00:00 2001 From: Sam Gross Date: Tue, 15 Oct 2024 16:43:46 +0000 Subject: [PATCH 4/4] Reenable test_processes_terminate --- Lib/test/test_concurrent_futures/test_shutdown.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/Lib/test/test_concurrent_futures/test_shutdown.py b/Lib/test/test_concurrent_futures/test_shutdown.py index ba3618614a9bf9..7a4065afd46fc8 100644 --- a/Lib/test/test_concurrent_futures/test_shutdown.py +++ b/Lib/test/test_concurrent_futures/test_shutdown.py @@ -253,9 +253,6 @@ def test_cancel_futures_wait_false(self): class ProcessPoolShutdownTest(ExecutorShutdownTest): - # gh-125451: 'lock' cannot be serialized, the test is broken - # and hangs randomly - @unittest.skipIf(True, "broken test") def test_processes_terminate(self): def acquire_lock(lock): lock.acquire()