Skip to content

Commit 2b2e712

Browse files
ProcessPoolExecutor should not share one BrokenProcessPool exception among all failed futures (#101267)
1 parent 7b20a0f commit 2b2e712

3 files changed

Lines changed: 32 additions & 10 deletions

File tree

Lib/concurrent/futures/process.py

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -471,17 +471,15 @@ def terminate_broken(self, cause):
471471
executor._shutdown_thread = True
472472
executor = None
473473

474-
# All pending tasks are to be marked failed with the following
475-
# BrokenProcessPool error
476-
bpe = BrokenProcessPool("A process in the process pool was "
477-
"terminated abruptly while the future was "
478-
"running or pending.")
479-
if cause is not None:
480-
bpe.__cause__ = _RemoteTraceback(
481-
f"\n'''\n{''.join(cause)}'''")
482-
483474
# Mark pending tasks as failed.
484475
for work_id, work_item in self.pending_work_items.items():
476+
bpe = BrokenProcessPool(
477+
"A process in the process pool was "
478+
"terminated abruptly while the future was "
479+
"running or pending.")
480+
if cause is not None:
481+
bpe.__cause__ = _RemoteTraceback(
482+
f"\n'''\n{''.join(cause)}'''")
485483
work_item.future.set_exception(bpe)
486484
# Delete references to object. See issue16284
487485
del work_item
@@ -762,7 +760,8 @@ def _spawn_process(self):
762760
def submit(self, fn, /, *args, **kwargs):
763761
with self._shutdown_lock:
764762
if self._broken:
765-
raise BrokenProcessPool(self._broken)
763+
raise BrokenProcessPool(
764+
'cannot schedule new futures after process pool has broken')
766765
if self._shutdown_thread:
767766
raise RuntimeError('cannot schedule new futures after shutdown')
768767
if _global_shutdown:

Lib/test/test_concurrent_futures.py

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import os
1616
import queue
1717
import sys
18+
import traceback
1819
import threading
1920
import time
2021
import unittest
@@ -980,6 +981,27 @@ def test_killed_child(self):
980981
# Submitting other jobs fails as well.
981982
self.assertRaises(BrokenProcessPool, self.executor.submit, pow, 2, 8)
982983

984+
def test_broken_process_pool_traceback(self):
985+
# When a child process is abruptly terminated, the whole pool gets
986+
# "broken", and a BrokenProcessPool exception should be created
987+
# for each future instead of sharing one exception among all futures.
988+
futures = [self.executor.submit(time.sleep, 3) for _ in range(3)]
989+
# Get one of the processes, and terminate (kill) it.
990+
p = next(iter(self.executor._processes.values()))
991+
p.terminate()
992+
for fut in futures:
993+
count = None
994+
try:
995+
fut.result()
996+
except BrokenProcessPool as e:
997+
count = sum(
998+
1
999+
for frame_summary in traceback.extract_tb(e.__traceback__)
1000+
if frame_summary.filename == __file__
1001+
)
1002+
# This code file should appear exactly once in the traceback.
1003+
self.assertEqual(count, 1)
1004+
9831005
def test_map_chunksize(self):
9841006
def bad_map():
9851007
list(self.executor.map(pow, range(40), range(40), chunksize=-1))

Misc/ACKS

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1649,6 +1649,7 @@ Charlie Shepherd
16491649
Bruce Sherwood
16501650
Gregory Shevchenko
16511651
Hai Shi
1652+
Daniel Shields
16521653
Alexander Shigin
16531654
Pete Shinners
16541655
Michael Shiplett

0 commit comments

Comments
 (0)