Skip to content

Commit 49ea36d

Browse files
committed
pythongh-109047: concurrent.futures catches PythonFinalizationError (python#109810)
concurrent.futures: The *executor manager thread* now catches exceptions when adding an item to the *call queue*. During Python finalization, creating a new thread can now raise RuntimeError. Catch the exception and call terminate_broken() in this case. Add test_python_finalization_error() to test_concurrent_futures. concurrent.futures._ExecutorManagerThread changes: * terminate_broken() no longer calls shutdown_workers() since the call queue is no longer working anymore (read and write ends of the queue pipe are closed). * terminate_broken() now terminates child processes, not only wait until they complete. * _ExecutorManagerThread.terminate_broken() now holds shutdown_lock to prevent race conditons with ProcessPoolExecutor.submit(). multiprocessing.Queue changes: * Add _terminate_broken() method. * _start_thread() sets _thread to None on exception to prevent leaking "dangling threads" even if the thread was not started yet. (cherry picked from commit 6351842)
1 parent 8882b30 commit 49ea36d

File tree

4 files changed

+90
-17
lines changed

4 files changed

+90
-17
lines changed

Lib/concurrent/futures/process.py

+37-12
Original file line numberDiff line numberDiff line change
@@ -336,7 +336,14 @@ def run(self):
336336
# Main loop for the executor manager thread.
337337

338338
while True:
339-
self.add_call_item_to_queue()
339+
# gh-109047: During Python finalization, self.call_queue.put()
340+
# creation of a thread can fail with RuntimeError.
341+
try:
342+
self.add_call_item_to_queue()
343+
except BaseException as exc:
344+
cause = format_exception(exc)
345+
self.terminate_broken(cause)
346+
return
340347

341348
result_item, is_broken, cause = self.wait_result_broken_or_wakeup()
342349

@@ -420,8 +427,8 @@ def wait_result_broken_or_wakeup(self):
420427
try:
421428
result_item = result_reader.recv()
422429
is_broken = False
423-
except BaseException as e:
424-
cause = format_exception(type(e), e, e.__traceback__)
430+
except BaseException as exc:
431+
cause = format_exception(exc)
425432

426433
elif wakeup_reader in ready:
427434
is_broken = False
@@ -464,7 +471,7 @@ def is_shutting_down(self):
464471
return (_global_shutdown or executor is None
465472
or executor._shutdown_thread)
466473

467-
def terminate_broken(self, cause):
474+
def _terminate_broken(self, cause):
468475
# Terminate the executor because it is in a broken state. The cause
469476
# argument can be used to display more information on the error that
470477
# lead the executor into becoming broken.
@@ -489,7 +496,14 @@ def terminate_broken(self, cause):
489496

490497
# Mark pending tasks as failed.
491498
for work_id, work_item in self.pending_work_items.items():
492-
work_item.future.set_exception(bpe)
499+
try:
500+
work_item.future.set_exception(bpe)
501+
except _base.InvalidStateError:
502+
# set_exception() fails if the future is cancelled: ignore it.
503+
# Trying to check if the future is cancelled before calling
504+
# set_exception() would leave a race condition if the future is
505+
# cancelled between the check and set_exception().
506+
pass
493507
# Delete references to object. See issue16284
494508
del work_item
495509
self.pending_work_items.clear()
@@ -499,12 +513,14 @@ def terminate_broken(self, cause):
499513
for p in self.processes.values():
500514
p.terminate()
501515

502-
# Prevent queue writing to a pipe which is no longer read.
503-
# https://github.com/python/cpython/issues/94777
504-
self.call_queue._reader.close()
516+
self.call_queue._terminate_broken()
505517

506518
# clean up resources
507-
self.join_executor_internals()
519+
self._join_executor_internals(broken=True)
520+
521+
def terminate_broken(self, cause):
522+
with self.shutdown_lock:
523+
self._terminate_broken(cause)
508524

509525
def flag_executor_shutting_down(self):
510526
# Flag the executor as shutting down and cancel remaining tasks if
@@ -547,15 +563,24 @@ def shutdown_workers(self):
547563
break
548564

549565
def join_executor_internals(self):
550-
self.shutdown_workers()
566+
with self.shutdown_lock:
567+
self._join_executor_internals()
568+
569+
def _join_executor_internals(self, broken=False):
570+
# If broken, call_queue was closed and so can no longer be used.
571+
if not broken:
572+
self.shutdown_workers()
573+
551574
# Release the queue's resources as soon as possible.
552575
self.call_queue.close()
553576
self.call_queue.join_thread()
554-
with self.shutdown_lock:
555-
self.thread_wakeup.close()
577+
self.thread_wakeup.close()
578+
556579
# If .join() is not called on the created processes then
557580
# some ctx.Queue methods may deadlock on Mac OS X.
558581
for p in self.processes.values():
582+
if broken:
583+
p.terminate()
559584
p.join()
560585

561586
def get_n_children_alive(self):

Lib/multiprocessing/queues.py

+20-5
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,15 @@ def cancel_join_thread(self):
158158
except AttributeError:
159159
pass
160160

161+
def _terminate_broken(self):
162+
# Close a Queue on error.
163+
164+
# gh-94777: Prevent queue writing to a pipe which is no longer read.
165+
self._reader.close()
166+
167+
self.close()
168+
self.join_thread()
169+
161170
def _start_thread(self):
162171
debug('Queue._start_thread()')
163172

@@ -169,13 +178,19 @@ def _start_thread(self):
169178
self._wlock, self._reader.close, self._writer.close,
170179
self._ignore_epipe, self._on_queue_feeder_error,
171180
self._sem),
172-
name='QueueFeederThread'
181+
name='QueueFeederThread',
182+
daemon=True,
173183
)
174-
self._thread.daemon = True
175184

176-
debug('doing self._thread.start()')
177-
self._thread.start()
178-
debug('... done self._thread.start()')
185+
try:
186+
debug('doing self._thread.start()')
187+
self._thread.start()
188+
debug('... done self._thread.start()')
189+
except:
190+
# gh-109047: During Python finalization, creating a thread
191+
# can fail with RuntimeError.
192+
self._thread = None
193+
raise
179194

180195
if not self._joincancelled:
181196
self._jointhread = Finalize(

Lib/test/test_concurrent_futures/test_process_pool.py

+29
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import os
22
import sys
3+
import threading
34
import time
45
import unittest
56
from concurrent import futures
@@ -187,6 +188,34 @@ def test_max_tasks_early_shutdown(self):
187188
for i, future in enumerate(futures):
188189
self.assertEqual(future.result(), mul(i, i))
189190

191+
def test_python_finalization_error(self):
192+
# gh-109047: Catch RuntimeError on thread creation
193+
# during Python finalization.
194+
195+
context = self.get_context()
196+
197+
# gh-109047: Mock the threading.start_new_thread() function to inject
198+
# RuntimeError: simulate the error raised during Python finalization.
199+
# Block the second creation: create _ExecutorManagerThread, but block
200+
# QueueFeederThread.
201+
orig_start_new_thread = threading._start_new_thread
202+
nthread = 0
203+
def mock_start_new_thread(func, *args):
204+
nonlocal nthread
205+
if nthread >= 1:
206+
raise RuntimeError("can't create new thread at "
207+
"interpreter shutdown")
208+
nthread += 1
209+
return orig_start_new_thread(func, *args)
210+
211+
with support.swap_attr(threading, '_start_new_thread',
212+
mock_start_new_thread):
213+
executor = self.executor_type(max_workers=2, mp_context=context)
214+
with executor:
215+
with self.assertRaises(BrokenProcessPool):
216+
list(executor.map(mul, [(2, 3)] * 10))
217+
executor.shutdown()
218+
190219

191220
create_executor_tests(globals(), ProcessPoolExecutorTest,
192221
executor_mixins=(ProcessPoolForkMixin,
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
:mod:`concurrent.futures`: The *executor manager thread* now catches exceptions
2+
when adding an item to the *call queue*. During Python finalization, creating a
3+
new thread can now raise :exc:`RuntimeError`. Catch the exception and call
4+
``terminate_broken()`` in this case. Patch by Victor Stinner.

0 commit comments

Comments
 (0)