2929from collections .abc import Callable , Iterator , Mapping , Sequence , Set as AbstractSet
3030from heapq import heappop , heappush
3131from textwrap import dedent
32+ from threading import Thread
3233from typing import (
3334 TYPE_CHECKING ,
3435 Any ,
@@ -371,6 +372,7 @@ def default_flush_errors(
371372 extra_plugins = extra_plugins or []
372373
373374 workers = []
375+ connect_threads = []
374376 if options .num_workers > 0 :
375377 # TODO: switch to something more efficient than pickle (also in the daemon).
376378 pickled_options = pickle .dumps (options .snapshot ())
@@ -383,10 +385,17 @@ def default_flush_errors(
383385 buf = WriteBuffer ()
384386 sources_message .write (buf )
385387 sources_data = buf .getvalue ()
388+
389+ def connect (wc : WorkerClient , data : bytes ) -> None :
390+ # Start loading sources in each worker as soon as it is up.
391+ wc .connect ()
392+ wc .conn .write_bytes (data )
393+
394+ # We don't wait for workers to be ready until they are actually needed.
386395 for worker in workers :
387- # Start loading graph in each worker as soon as it is up.
388- worker . connect ()
389- worker . conn . write_bytes ( sources_data )
396+ thread = Thread ( target = connect , args = ( worker , sources_data ))
397+ thread . start ()
398+ connect_threads . append ( thread )
390399
391400 try :
392401 result = build_inner (
@@ -399,6 +408,7 @@ def default_flush_errors(
399408 stderr ,
400409 extra_plugins ,
401410 workers ,
411+ connect_threads ,
402412 )
403413 result .errors = messages
404414 return result
@@ -412,6 +422,10 @@ def default_flush_errors(
412422 e .messages = messages
413423 raise
414424 finally :
425+ # In case of an early crash it is better to wait for workers to become ready, and
426+ # shut them down cleanly. Otherwise, they will linger until connection timeout.
427+ for thread in connect_threads :
428+ thread .join ()
415429 for worker in workers :
416430 try :
417431 send (worker .conn , SccRequestMessage (scc_id = None , import_errors = {}, mod_data = {}))
@@ -431,6 +445,7 @@ def build_inner(
431445 stderr : TextIO ,
432446 extra_plugins : Sequence [Plugin ],
433447 workers : list [WorkerClient ],
448+ connect_threads : list [Thread ],
434449) -> BuildResult :
435450 if platform .python_implementation () == "CPython" :
436451 # Run gc less frequently, as otherwise we can spend a large fraction of
@@ -486,7 +501,7 @@ def build_inner(
486501
487502 reset_global_state ()
488503 try :
489- graph = dispatch (sources , manager , stdout )
504+ graph = dispatch (sources , manager , stdout , connect_threads )
490505 if not options .fine_grained_incremental :
491506 type_state .reset_all_subtype_caches ()
492507 if options .timing_stats is not None :
@@ -496,9 +511,7 @@ def build_inner(
496511 warn_unused_configs (options , flush_errors )
497512 return BuildResult (manager , graph )
498513 finally :
499- t0 = time .time ()
500- manager .metastore .commit ()
501- manager .add_stats (cache_commit_time = time .time () - t0 )
514+ manager .commit ()
502515 manager .log (
503516 "Build finished in %.3f seconds with %d modules, and %d errors"
504517 % (
@@ -1119,6 +1132,11 @@ def report_file(
11191132 if self .reports is not None and self .source_set .is_source (file ):
11201133 self .reports .file (file , self .modules , type_map , options )
11211134
1135+ def commit (self ) -> None :
1136+ t0 = time .time ()
1137+ self .metastore .commit ()
1138+ self .add_stats (cache_commit_time = time .time () - t0 )
1139+
11221140 def verbosity (self ) -> int :
11231141 return self .options .verbosity
11241142
@@ -1156,6 +1174,24 @@ def add_stats(self, **kwds: Any) -> None:
11561174 def stats_summary (self ) -> Mapping [str , object ]:
11571175 return self .stats
11581176
1177+ def broadcast (self , message : bytes ) -> None :
1178+ """Broadcast same message to all workers in parallel."""
1179+ t0 = time .time ()
1180+ threads = []
1181+ for worker in self .workers :
1182+ thread = Thread (target = worker .conn .write_bytes , args = (message ,))
1183+ thread .start ()
1184+ threads .append (thread )
1185+ for thread in threads :
1186+ thread .join ()
1187+ self .add_stats (broadcast_time = time .time () - t0 )
1188+
1189+ def wait_ack (self ) -> None :
1190+ """Wait for an ack from all workers."""
1191+ for worker in self .workers :
1192+ buf = receive (worker .conn )
1193+ assert read_tag (buf ) == ACK_MESSAGE
1194+
11591195 def submit (self , graph : Graph , sccs : list [SCC ]) -> None :
11601196 """Submit a stale SCC for processing in current process or parallel workers."""
11611197 if self .workers :
@@ -1176,6 +1212,7 @@ def submit_to_workers(self, graph: Graph, sccs: list[SCC] | None = None) -> None
11761212 for mod_id in scc .mod_ids
11771213 if (path := graph [mod_id ].xpath ) in self .errors .recorded
11781214 }
1215+ t0 = time .time ()
11791216 send (
11801217 self .workers [idx ].conn ,
11811218 SccRequestMessage (
@@ -1193,6 +1230,7 @@ def submit_to_workers(self, graph: Graph, sccs: list[SCC] | None = None) -> None
11931230 },
11941231 ),
11951232 )
1233+ self .add_stats (scc_send_time = time .time () - t0 )
11961234
11971235 def wait_for_done (
11981236 self , graph : Graph
@@ -1221,7 +1259,10 @@ def wait_for_done_workers(
12211259
12221260 done_sccs = []
12231261 results = {}
1224- for idx in ready_to_read ([w .conn for w in self .workers ], WORKER_DONE_TIMEOUT ):
1262+ t0 = time .time ()
1263+ ready = ready_to_read ([w .conn for w in self .workers ], WORKER_DONE_TIMEOUT )
1264+ t1 = time .time ()
1265+ for idx in ready :
12251266 buf = receive (self .workers [idx ].conn )
12261267 assert read_tag (buf ) == SCC_RESPONSE_MESSAGE
12271268 data = SccResponseMessage .read (buf )
@@ -1232,6 +1273,7 @@ def wait_for_done_workers(
12321273 assert data .result is not None
12331274 results .update (data .result )
12341275 done_sccs .append (self .scc_by_id [scc_id ])
1276+ self .add_stats (scc_wait_time = t1 - t0 , scc_receive_time = time .time () - t1 )
12351277 self .submit_to_workers (graph ) # advance after some workers are free.
12361278 return (
12371279 done_sccs ,
@@ -3685,7 +3727,12 @@ def log_configuration(manager: BuildManager, sources: list[BuildSource]) -> None
36853727# The driver
36863728
36873729
3688- def dispatch (sources : list [BuildSource ], manager : BuildManager , stdout : TextIO ) -> Graph :
3730+ def dispatch (
3731+ sources : list [BuildSource ],
3732+ manager : BuildManager ,
3733+ stdout : TextIO ,
3734+ connect_threads : list [Thread ],
3735+ ) -> Graph :
36893736 log_configuration (manager , sources )
36903737
36913738 t0 = time .time ()
@@ -3742,7 +3789,7 @@ def dispatch(sources: list[BuildSource], manager: BuildManager, stdout: TextIO)
37423789 dump_graph (graph , stdout )
37433790 return graph
37443791
3745- # Fine grained dependencies that didn't have an associated module in the build
3792+ # Fine- grained dependencies that didn't have an associated module in the build
37463793 # are serialized separately, so we read them after we load the graph.
37473794 # We need to read them both for running in daemon mode and if we are generating
37483795 # a fine-grained cache (so that we can properly update them incrementally).
@@ -3755,25 +3802,28 @@ def dispatch(sources: list[BuildSource], manager: BuildManager, stdout: TextIO)
37553802 if fg_deps_meta is not None :
37563803 manager .fg_deps_meta = fg_deps_meta
37573804 elif manager .stats .get ("fresh_metas" , 0 ) > 0 :
3758- # Clear the stats so we don't infinite loop because of positive fresh_metas
3805+ # Clear the stats, so we don't infinite loop because of positive fresh_metas
37593806 manager .stats .clear ()
37603807 # There were some cache files read, but no fine-grained dependencies loaded.
37613808 manager .log ("Error reading fine-grained dependencies cache -- aborting cache load" )
37623809 manager .cache_enabled = False
37633810 manager .log ("Falling back to full run -- reloading graph..." )
3764- return dispatch (sources , manager , stdout )
3811+ return dispatch (sources , manager , stdout , connect_threads )
37653812
37663813 # If we are loading a fine-grained incremental mode cache, we
37673814 # don't want to do a real incremental reprocess of the
37683815 # graph---we'll handle it all later.
37693816 if not manager .use_fine_grained_cache ():
3817+ # Wait for workers since they may be needed at this point.
3818+ for thread in connect_threads :
3819+ thread .join ()
37703820 process_graph (graph , manager )
37713821 # Update plugins snapshot.
37723822 write_plugins_snapshot (manager )
37733823 manager .old_plugins_snapshot = manager .plugins_snapshot
37743824 if manager .options .cache_fine_grained or manager .options .fine_grained_incremental :
3775- # If we are running a daemon or are going to write cache for further fine grained use,
3776- # then we need to collect fine grained protocol dependencies.
3825+ # If we are running a daemon or are going to write cache for further fine- grained use,
3826+ # then we need to collect fine- grained protocol dependencies.
37773827 # Since these are a global property of the program, they are calculated after we
37783828 # processed the whole graph.
37793829 type_state .add_all_protocol_deps (manager .fg_deps )
@@ -4166,10 +4216,8 @@ def process_graph(graph: Graph, manager: BuildManager) -> None:
41664216 buf = WriteBuffer ()
41674217 graph_message .write (buf )
41684218 graph_data = buf .getvalue ()
4169- for worker in manager .workers :
4170- buf = receive (worker .conn )
4171- assert read_tag (buf ) == ACK_MESSAGE
4172- worker .conn .write_bytes (graph_data )
4219+ manager .wait_ack ()
4220+ manager .broadcast (graph_data )
41734221
41744222 sccs = sorted_components (graph )
41754223 manager .log (
@@ -4187,13 +4235,9 @@ def process_graph(graph: Graph, manager: BuildManager) -> None:
41874235 buf = WriteBuffer ()
41884236 sccs_message .write (buf )
41894237 sccs_data = buf .getvalue ()
4190- for worker in manager .workers :
4191- buf = receive (worker .conn )
4192- assert read_tag (buf ) == ACK_MESSAGE
4193- worker .conn .write_bytes (sccs_data )
4194- for worker in manager .workers :
4195- buf = receive (worker .conn )
4196- assert read_tag (buf ) == ACK_MESSAGE
4238+ manager .wait_ack ()
4239+ manager .broadcast (sccs_data )
4240+ manager .wait_ack ()
41974241
41984242 manager .free_workers = set (range (manager .options .num_workers ))
41994243
0 commit comments