Skip to content

[3.12] gh-109047: concurrent.futures catches RuntimeError (#109810) #110126

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Oct 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 37 additions & 12 deletions Lib/concurrent/futures/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,14 @@ def run(self):
# Main loop for the executor manager thread.

while True:
self.add_call_item_to_queue()
# gh-109047: During Python finalization, self.call_queue.put()
# creation of a thread can fail with RuntimeError.
try:
self.add_call_item_to_queue()
except BaseException as exc:
cause = format_exception(exc)
self.terminate_broken(cause)
return

result_item, is_broken, cause = self.wait_result_broken_or_wakeup()

Expand Down Expand Up @@ -420,8 +427,8 @@ def wait_result_broken_or_wakeup(self):
try:
result_item = result_reader.recv()
is_broken = False
except BaseException as e:
cause = format_exception(type(e), e, e.__traceback__)
except BaseException as exc:
cause = format_exception(exc)

elif wakeup_reader in ready:
is_broken = False
Expand Down Expand Up @@ -464,7 +471,7 @@ def is_shutting_down(self):
return (_global_shutdown or executor is None
or executor._shutdown_thread)

def terminate_broken(self, cause):
def _terminate_broken(self, cause):
# Terminate the executor because it is in a broken state. The cause
# argument can be used to display more information on the error that
# lead the executor into becoming broken.
Expand All @@ -489,7 +496,14 @@ def terminate_broken(self, cause):

# Mark pending tasks as failed.
for work_id, work_item in self.pending_work_items.items():
work_item.future.set_exception(bpe)
try:
work_item.future.set_exception(bpe)
except _base.InvalidStateError:
# set_exception() fails if the future is cancelled: ignore it.
# Trying to check if the future is cancelled before calling
# set_exception() would leave a race condition if the future is
# cancelled between the check and set_exception().
pass
# Delete references to object. See issue16284
del work_item
self.pending_work_items.clear()
Expand All @@ -499,12 +513,14 @@ def terminate_broken(self, cause):
for p in self.processes.values():
p.terminate()

# Prevent queue writing to a pipe which is no longer read.
# https://github.com/python/cpython/issues/94777
self.call_queue._reader.close()
self.call_queue._terminate_broken()

# clean up resources
self.join_executor_internals()
self._join_executor_internals(broken=True)

def terminate_broken(self, cause):
with self.shutdown_lock:
self._terminate_broken(cause)

def flag_executor_shutting_down(self):
# Flag the executor as shutting down and cancel remaining tasks if
Expand Down Expand Up @@ -547,15 +563,24 @@ def shutdown_workers(self):
break

def join_executor_internals(self):
self.shutdown_workers()
with self.shutdown_lock:
self._join_executor_internals()

def _join_executor_internals(self, broken=False):
# If broken, call_queue was closed and so can no longer be used.
if not broken:
self.shutdown_workers()

# Release the queue's resources as soon as possible.
self.call_queue.close()
self.call_queue.join_thread()
with self.shutdown_lock:
self.thread_wakeup.close()
self.thread_wakeup.close()

# If .join() is not called on the created processes then
# some ctx.Queue methods may deadlock on Mac OS X.
for p in self.processes.values():
if broken:
p.terminate()
p.join()

def get_n_children_alive(self):
Expand Down
25 changes: 20 additions & 5 deletions Lib/multiprocessing/queues.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,15 @@ def cancel_join_thread(self):
except AttributeError:
pass

def _terminate_broken(self):
# Close a Queue on error.

# gh-94777: Prevent queue writing to a pipe which is no longer read.
self._reader.close()

self.close()
self.join_thread()

def _start_thread(self):
debug('Queue._start_thread()')

Expand All @@ -169,13 +178,19 @@ def _start_thread(self):
self._wlock, self._reader.close, self._writer.close,
self._ignore_epipe, self._on_queue_feeder_error,
self._sem),
name='QueueFeederThread'
name='QueueFeederThread',
daemon=True,
)
self._thread.daemon = True

debug('doing self._thread.start()')
self._thread.start()
debug('... done self._thread.start()')
try:
debug('doing self._thread.start()')
self._thread.start()
debug('... done self._thread.start()')
except:
# gh-109047: During Python finalization, creating a thread
# can fail with RuntimeError.
self._thread = None
raise

if not self._joincancelled:
self._jointhread = Finalize(
Expand Down
29 changes: 29 additions & 0 deletions Lib/test/test_concurrent_futures/test_process_pool.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import os
import sys
import threading
import time
import unittest
from concurrent import futures
Expand Down Expand Up @@ -187,6 +188,34 @@ def test_max_tasks_early_shutdown(self):
for i, future in enumerate(futures):
self.assertEqual(future.result(), mul(i, i))

def test_python_finalization_error(self):
# gh-109047: Catch RuntimeError on thread creation
# during Python finalization.

context = self.get_context()

# gh-109047: Mock the threading.start_new_thread() function to inject
# RuntimeError: simulate the error raised during Python finalization.
# Block the second creation: create _ExecutorManagerThread, but block
# QueueFeederThread.
orig_start_new_thread = threading._start_new_thread
nthread = 0
def mock_start_new_thread(func, *args):
nonlocal nthread
if nthread >= 1:
raise RuntimeError("can't create new thread at "
"interpreter shutdown")
nthread += 1
return orig_start_new_thread(func, *args)

with support.swap_attr(threading, '_start_new_thread',
mock_start_new_thread):
executor = self.executor_type(max_workers=2, mp_context=context)
with executor:
with self.assertRaises(BrokenProcessPool):
list(executor.map(mul, [(2, 3)] * 10))
executor.shutdown()


create_executor_tests(globals(), ProcessPoolExecutorTest,
executor_mixins=(ProcessPoolForkMixin,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
:mod:`concurrent.futures`: The *executor manager thread* now catches exceptions
when adding an item to the *call queue*. During Python finalization, creating a
new thread can now raise :exc:`RuntimeError`. Catch the exception and call
``terminate_broken()`` in this case. Patch by Victor Stinner.