Skip to content

Commit 2621312

Browse files
committed
allow initiating peer to close stream gracefully
I have a usecase where I would like to remove a worker without killing it. Currently, trying to disconnect from the head node will cause the message handler loop to throw a fatal exception, so this adds a check that the connection is still open when trying to read new messages.
1 parent f2460b1 commit 2621312

File tree

4 files changed

+117
-1
lines changed

4 files changed

+117
-1
lines changed

src/process_messages.jl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -167,7 +167,7 @@ function message_handler_loop(r_stream::IO, w_stream::IO, incoming::Bool)
167167

168168
readbytes!(r_stream, boundary, length(MSG_BOUNDARY))
169169

170-
while true
170+
while !(incoming && eof(r_stream))
171171
reset_state(serializer)
172172
header = deserialize_hdr_raw(r_stream)
173173
# println("header: ", header)

test/persistent_workers.jl

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
include("testhelpers/PersistentWorkers.jl")
2+
using .PersistentWorkers
3+
using Test
4+
using Random
5+
using DistributedNext
6+
7+
@testset "PersistentWorkers.jl" begin
8+
cookie = randstring(16)
9+
port = rand(9128:9999) # TODO: make sure port is available?
10+
helpers_path = joinpath(@__DIR__, "testhelpers", "PersistentWorkers.jl")
11+
cmd = `$(Base.julia_exename()) --startup=no --project=$(Base.active_project()) -L $(helpers_path) -e "using .PersistentWorkers; wait(start_worker_loop($port; cluster_cookie=$(repr(cookie)))[1])"`
12+
worker = run(pipeline(cmd; stdout, stderr); wait=false)
13+
try
14+
@show worker.cmd
15+
cluster_cookie(cookie)
16+
sleep(10)
17+
18+
p = addprocs(PersistentWorkerManager(port))[]
19+
@test procs() == [1, p]
20+
@test workers() == [p]
21+
@test remotecall_fetch(myid, p) == p
22+
rmprocs(p)
23+
@test procs() == [1]
24+
@test workers() == [1]
25+
@test process_running(worker)
26+
# this shouldn't error
27+
@everywhere 1+1
28+
29+
# try the same thing again for the same worker
30+
p = addprocs(PersistentWorkerManager(port))[]
31+
@test procs() == [1, p]
32+
@test workers() == [p]
33+
@test remotecall_fetch(myid, p) == p
34+
rmprocs(p)
35+
@test procs() == [1]
36+
@test workers() == [1]
37+
@test process_running(worker)
38+
# this shouldn't error
39+
@everywhere 1+1
40+
finally
41+
kill(worker)
42+
wait(worker)
43+
end
44+
end

test/runtests.jl

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@ include("managers.jl")
3434

3535
include("distributed_stdlib_detection.jl")
3636

37+
include("persistent_workers.jl")
38+
3739
@testset "Aqua" begin
3840
Aqua.test_all(DistributedNext)
3941
end
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
module PersistentWorkers
2+
3+
using DistributedNext: DistributedNext, ClusterManager, WorkerConfig, worker_from_id, set_worker_state, W_TERMINATED
4+
using Sockets: InetAddr, localhost
5+
6+
export PersistentWorkerManager, start_worker_loop
7+
8+
struct PersistentWorkerManager{IP} <: ClusterManager
9+
addr::InetAddr{IP}
10+
end
11+
12+
PersistentWorkerManager(host, port::Integer) = PersistentWorkerManager(InetAddr(host, port))
13+
PersistentWorkerManager(port::Integer) = PersistentWorkerManager(localhost, port)
14+
15+
function DistributedNext.launch(cm::PersistentWorkerManager, ::Dict, launched::Array, launch_ntfy::Base.GenericCondition{Base.AlwaysLockedST})
16+
(; host, port) = cm.addr
17+
wc = WorkerConfig()
18+
wc.io = nothing
19+
wc.host = string(host)
20+
wc.bind_addr = string(host)
21+
wc.port = Int(port)
22+
push!(launched, wc)
23+
notify(launch_ntfy)
24+
return nothing
25+
end
26+
27+
function DistributedNext.manage(::PersistentWorkerManager, ::Int, ::WorkerConfig, ::Symbol) end
28+
29+
# don't actually kill the worker, just close the streams
30+
function Base.kill(::PersistentWorkerManager, pid::Int, ::WorkerConfig)
31+
w = worker_from_id(pid)
32+
close(w.r_stream)
33+
close(w.w_stream)
34+
set_worker_state(w, W_TERMINATED)
35+
return nothing
36+
end
37+
38+
using DistributedNext: LPROC, init_worker, process_messages, cluster_cookie
39+
using Sockets: IPAddr, listen, listenany, accept
40+
41+
function start_worker_loop(host::IPAddr, port::Union{Nothing, Integer}; cluster_cookie=cluster_cookie())
42+
init_worker(cluster_cookie)
43+
LPROC.bind_addr = string(host)
44+
if port === nothing
45+
port_hint = 9000 + (getpid() % 1000)
46+
port, sock = listenany(host, UInt16(port_hint))
47+
else
48+
sock = listen(host, port)
49+
end
50+
LPROC.bind_port = port
51+
t = let sock=sock
52+
@async while isopen(sock)
53+
client = accept(sock)
54+
process_messages(client, client, true)
55+
end
56+
end
57+
errormonitor(t)
58+
@info "Listening on $host:$port, cluster_cookie=$cluster_cookie"
59+
return t, host, port
60+
end
61+
62+
function start_worker_loop((; host, port)::InetAddr; cluster_cookie=cluster_cookie())
63+
return start_worker_loop(host, port; cluster_cookie)
64+
end
65+
66+
function start_worker_loop(port::Union{Nothing, Integer}=nothing; cluster_cookie=cluster_cookie())
67+
return start_worker_loop(localhost, port; cluster_cookie)
68+
end
69+
70+
end

0 commit comments

Comments
 (0)