@@ -57,38 +57,46 @@ def _ordered_generator(
5757 ) -> Iterator [list [Out ]]:
5858 """Generate results in their original order."""
5959 futures = deque ()
60+ executor_shutdown = False
6061
6162 # Pre-submit initial batch of futures
6263 for _ in range (self .max_workers + 1 ):
64+ if executor_shutdown :
65+ break
6366 try :
6467 chunk = next (chunks_iter )
6568 futures .append (executor .submit (_worker_process_chunk , transformer , context_handle , chunk ))
6669 except StopIteration :
6770 break
6871 except RuntimeError as e :
6972 if "cannot schedule new futures after shutdown" in str (e ):
73+ executor_shutdown = True
7074 break
7175 raise
7276
7377 while futures :
7478 try :
7579 yield futures .popleft ().result ()
7680
77- # Try to submit the next chunk
78- try :
79- chunk = next (chunks_iter )
80- futures .append (executor .submit (_worker_process_chunk , transformer , context_handle , chunk ))
81- except StopIteration :
82- continue
83- except RuntimeError as e :
84- if "cannot schedule new futures after shutdown" in str (e ):
85- # Executor is shut down, stop submitting new work
86- break
87- raise
81+ # Try to submit the next chunk only if executor is not shutdown
82+ if not executor_shutdown :
83+ try :
84+ chunk = next (chunks_iter )
85+ futures .append (executor .submit (_worker_process_chunk , transformer , context_handle , chunk ))
86+ except StopIteration :
87+ continue
88+ except RuntimeError as e :
89+ if "cannot schedule new futures after shutdown" in str (e ):
90+ executor_shutdown = True
91+ continue
92+ raise
8893 except Exception :
8994 # Cancel remaining futures and re-raise
9095 for future in futures :
91- future .cancel ()
96+ try :
97+ future .cancel ()
98+ except Exception :
99+ pass # Ignore cancellation errors
92100 futures .clear ()
93101 raise
94102
@@ -101,37 +109,44 @@ def _unordered_generator(
101109 ) -> Iterator [list [Out ]]:
102110 """Generate results as they complete."""
103111 futures = set ()
112+ executor_shutdown = False
104113
105114 # Pre-submit initial batch
106- try :
107- for chunk in itertools .islice (chunks_iter , self .max_workers + 1 ):
115+ for chunk in itertools .islice (chunks_iter , self .max_workers + 1 ):
116+ if executor_shutdown :
117+ break
118+ try :
108119 futures .add (executor .submit (_worker_process_chunk , transformer , context_handle , chunk ))
109- except RuntimeError as e :
110- if "cannot schedule new futures after shutdown" in str (e ):
111- # If we can't submit any futures, there's nothing to process
112- return
113- raise
120+ except RuntimeError as e :
121+ if "cannot schedule new futures after shutdown" in str (e ):
122+ executor_shutdown = True
123+ break
124+ raise
114125
115126 while futures :
116127 try :
117128 done , futures = wait (futures , return_when = FIRST_COMPLETED )
118129 for future in done :
119130 yield future .result ()
120131
121- # Try to submit next chunk if available
122- try :
123- chunk = next (chunks_iter )
124- futures .add (executor .submit (_worker_process_chunk , transformer , context_handle , chunk ))
125- except StopIteration :
126- continue
127- except RuntimeError as e :
128- if "cannot schedule new futures after shutdown" in str (e ):
129- # Executor is shut down, stop submitting new work
130- break
131- raise
132+ # Try to submit next chunk only if executor is not shutdown
133+ if not executor_shutdown :
134+ try :
135+ chunk = next (chunks_iter )
136+ futures .add (executor .submit (_worker_process_chunk , transformer , context_handle , chunk ))
137+ except StopIteration :
138+ continue
139+ except RuntimeError as e :
140+ if "cannot schedule new futures after shutdown" in str (e ):
141+ executor_shutdown = True
142+ continue
143+ raise
132144 except Exception :
133145 # Cancel remaining futures and re-raise
134146 for future in futures :
135- future .cancel ()
147+ try :
148+ future .cancel ()
149+ except Exception :
150+ pass # Ignore cancellation errors
136151 futures .clear ()
137152 raise
0 commit comments