Skip to content

Commit a904326

Browse files
committed
Change Distributed.cluster_manager from a global non-constant ClusterManager to a global constant Ref{ClusterManager} (#177)
(cherry picked from commit 2fe1aa4e267517565e99cd06664550dcd230cfc6)
1 parent 9ed1027 commit a904326

2 files changed

Lines changed: 17 additions & 6 deletions

File tree

src/cluster.jl

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,14 @@ Cluster managers implement how workers can be added, removed and communicated wi
99
"""
1010
abstract type ClusterManager end
1111

12+
# cluster_manager is a global constant
13+
const cluster_manager = Ref{ClusterManager}()
14+
15+
function throw_if_cluster_manager_unassigned()
16+
isassigned(cluster_manager) || error("cluster_manager is unassigned")
17+
return nothing
18+
end
19+
1220
"""
1321
WorkerConfig
1422
@@ -390,8 +398,7 @@ function init_worker(cookie::AbstractString, manager::ClusterManager=DefaultClus
390398

391399
# On workers, the default cluster manager connects via TCP sockets. Custom
392400
# transports will need to call this function with their own manager.
393-
global cluster_manager
394-
cluster_manager = manager
401+
cluster_manager[] = manager
395402

396403
# Since our pid has yet to be set, ensure no RemoteChannel / Future have been created or addprocs() called.
397404
@assert nprocs() <= 1

src/process_messages.jl

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -315,8 +315,10 @@ function handle_msg(msg::ResultMsg, header, r_stream, w_stream, version)
315315
end
316316

317317
function handle_msg(msg::IdentifySocketMsg, header, r_stream, w_stream, version)
318+
throw_if_cluster_manager_unassigned()
319+
318320
# register a new peer worker connection
319-
w = Worker(msg.from_pid, r_stream, w_stream, cluster_manager; version=version)::Worker
321+
w = Worker(msg.from_pid, r_stream, w_stream, cluster_manager[]; version=version)::Worker
320322
send_connection_hdr(w, false)
321323
send_msg_now(w, MsgHeader(), IdentifySocketAckMsg())
322324
notify(w.initialized)
@@ -328,8 +330,10 @@ function handle_msg(msg::IdentifySocketAckMsg, header, r_stream, w_stream, versi
328330
end
329331

330332
function handle_msg(msg::JoinPGRPMsg, header, r_stream, w_stream, version)
333+
throw_if_cluster_manager_unassigned()
334+
331335
LPROC.id = msg.self_pid
332-
controller = Worker(1, r_stream, w_stream, cluster_manager; version=version)::Worker
336+
controller = Worker(1, r_stream, w_stream, cluster_manager[]; version=version)::Worker
333337
notify(controller.initialized)
334338
register_worker(LPROC)
335339
topology(msg.topology)
@@ -348,9 +352,9 @@ function handle_msg(msg::JoinPGRPMsg, header, r_stream, w_stream, version)
348352
let rpid=rpid, wconfig=wconfig
349353
if lazy
350354
# The constructor registers the object with a global registry.
351-
Worker(rpid, ()->connect_to_peer(cluster_manager, rpid, wconfig))
355+
Worker(rpid, ()->connect_to_peer(cluster_manager[], rpid, wconfig))
352356
else
353-
@async connect_to_peer(cluster_manager, rpid, wconfig)
357+
@async connect_to_peer(cluster_manager[], rpid, wconfig)
354358
end
355359
end
356360
end

0 commit comments

Comments
 (0)