@@ -192,7 +192,7 @@ def __init__(self, max_workers=None, thread_name_prefix='',
192192 self ._threads = set ()
193193 self ._broken = False
194194 self ._shutdown = False
195- self ._shutdown_lock = threading .Lock ()
195+ self ._shutdown_lock = threading .RLock ()
196196 self ._thread_name_prefix = (thread_name_prefix or
197197 ("ThreadPoolExecutor-%d" % self ._counter ()))
198198
@@ -213,6 +213,10 @@ def submit(self, fn, /, *args, **kwargs):
213213
214214 self ._work_queue .put (w )
215215 self ._adjust_thread_count ()
216+ if self ._shutdown or _shutdown :
217+ f .cancel ()
218+ w .task = None # Clear reference to task arguments to avoid memory leak
219+ raise RuntimeError ('cannot schedule new futures after shutdown' )
216220 return f
217221 submit .__doc__ = _base .Executor .submit .__doc__
218222
@@ -252,6 +256,15 @@ def _initializer_failed(self):
252256 work_item .future .set_exception (self .BROKEN (self ._broken ))
253257
254258 def shutdown (self , wait = True , * , cancel_futures = False ):
259+ # Detect if we are called reentrantly (e.g. from a signal handler on a thread
260+ # already holding self._shutdown_lock). Fallback if RLock does not support _is_owned.
261+ reentrant = False
262+ if hasattr (self ._shutdown_lock , '_is_owned' ):
263+ try :
264+ reentrant = self ._shutdown_lock ._is_owned ()
265+ except Exception :
266+ pass
267+
255268 with self ._shutdown_lock :
256269 self ._shutdown = True
257270 if cancel_futures :
@@ -268,7 +281,10 @@ def shutdown(self, wait=True, *, cancel_futures=False):
268281 # Send a wake-up to prevent threads calling
269282 # _work_queue.get(block=True) from permanently blocking.
270283 self ._work_queue .put (None )
271- if wait :
284+
285+ # If we are reentrant, we cannot join threads synchronously because the current
286+ # thread is interrupted and blocking it would cause a deadlock.
287+ if wait and not reentrant :
272288 for t in self ._threads :
273289 t .join ()
274290 shutdown .__doc__ = _base .Executor .shutdown .__doc__
0 commit comments