Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 18 additions & 2 deletions Lib/concurrent/futures/thread.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ def __init__(self, max_workers=None, thread_name_prefix='',
self._threads = set()
self._broken = False
self._shutdown = False
self._shutdown_lock = threading.Lock()
self._shutdown_lock = threading.RLock()
self._thread_name_prefix = (thread_name_prefix or
("ThreadPoolExecutor-%d" % self._counter()))

Expand All @@ -213,6 +213,10 @@ def submit(self, fn, /, *args, **kwargs):

self._work_queue.put(w)
self._adjust_thread_count()
if self._shutdown or _shutdown:
f.cancel()
w.task = None # Clear reference to task arguments to avoid memory leak
raise RuntimeError('cannot schedule new futures after shutdown')
Comment on lines +216 to +219
return f
submit.__doc__ = _base.Executor.submit.__doc__

Expand Down Expand Up @@ -252,6 +256,15 @@ def _initializer_failed(self):
work_item.future.set_exception(self.BROKEN(self._broken))

def shutdown(self, wait=True, *, cancel_futures=False):
# Detect if we are called reentrantly (e.g. from a signal handler on a thread
# already holding self._shutdown_lock). Fallback if RLock does not support _is_owned.
reentrant = False
if hasattr(self._shutdown_lock, '_is_owned'):
try:
reentrant = self._shutdown_lock._is_owned()
except Exception:
pass

with self._shutdown_lock:
self._shutdown = True
if cancel_futures:
Expand All @@ -268,7 +281,10 @@ def shutdown(self, wait=True, *, cancel_futures=False):
# Send a wake-up to prevent threads calling
# _work_queue.get(block=True) from permanently blocking.
self._work_queue.put(None)
if wait:

# If we are reentrant, we cannot join threads synchronously because the current
Comment on lines 283 to +285
# thread is interrupted and blocking it would cause a deadlock.
if wait and not reentrant:
for t in self._threads:
t.join()
Comment on lines +287 to 289
shutdown.__doc__ = _base.Executor.shutdown.__doc__
Loading