@@ -1294,10 +1294,21 @@ def broadcast(self, message: bytes) -> None:
12941294
12951295 def wait_ack (self ) -> None :
12961296 """Wait for an ack from all workers."""
1297- for worker in self .workers :
1298- buf = receive ( worker . conn )
1297+ for idx in range ( len ( self .workers )) :
1298+ buf = self . receive_worker_message ( idx )
12991299 assert read_tag (buf ) == ACK_MESSAGE
13001300
1301+ def receive_worker_message (self , idx : int ) -> ReadBuffer :
1302+ """Receive a single message from a worker, with crash diagnostics."""
1303+ try :
1304+ return receive (self .workers [idx ].conn )
1305+ except OSError as exc :
1306+ exit_code = self .workers [idx ].proc .poll ()
1307+ exit_status = f"exit code { exit_code } " if exit_code is not None else "still running"
1308+ raise OSError (
1309+ f"Worker { idx } disconnected before sending data ({ exit_status } )"
1310+ ) from exc
1311+
13011312 def submit (self , graph : Graph , sccs : list [SCC ]) -> None :
13021313 """Submit a stale SCC for processing in current process or parallel workers."""
13031314 if self .workers :
@@ -1367,7 +1378,7 @@ def wait_for_done_workers(
13671378 ready = ready_to_read ([w .conn for w in self .workers ], WORKER_DONE_TIMEOUT )
13681379 t1 = time .time ()
13691380 for idx in ready :
1370- buf = receive ( self .workers [ idx ]. conn )
1381+ buf = self .receive_worker_message ( idx )
13711382 assert read_tag (buf ) == SCC_RESPONSE_MESSAGE
13721383 data = SccResponseMessage .read (buf )
13731384 if not data .is_interface :
0 commit comments