Skip to content

Commit d8f44f5

Browse files
authored
Merge pull request #25 from JuliaParallel/distributed-fixes
Distributed fixes
2 parents fbca46f + 9c35115 commit d8f44f5

File tree

8 files changed

+105
-42
lines changed

8 files changed

+105
-42
lines changed

.github/workflows/ci.yml

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,14 +28,18 @@ jobs:
2828
- '1.9'
2929
os:
3030
- ubuntu-latest
31-
- macOS-latest
3231
- windows-latest
3332
arch:
3433
- x64
3534
- x86
36-
exclude:
35+
include:
36+
- os: macOS-latest
37+
arch: aarch64
38+
version: '1'
3739
- os: macOS-latest
38-
arch: x86
40+
arch: aarch64
41+
version: 'nightly'
42+
exclude:
3943
- os: windows-latest # Killing workers doesn't work on windows in 1.9
4044
version: '1.9'
4145

@@ -58,7 +62,7 @@ jobs:
5862
- uses: julia-actions/julia-buildpkg@v1
5963
- uses: julia-actions/julia-runtest@v1
6064
env:
61-
JULIA_NUM_THREADS: 4
65+
JULIA_NUM_THREADS: 4,4
6266
- uses: julia-actions/julia-processcoverage@v1
6367
- uses: codecov/codecov-action@v5
6468
with:

Project.toml

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,17 +8,22 @@ Serialization = "9e88b42a-f829-5b0c-bbe9-9e923198166b"
88
Sockets = "6462fe0b-24de-5631-8697-dd941f90decc"
99

1010
[compat]
11+
Aqua = "0.8"
1112
Distributed = "1"
13+
LibSSH = "0.7"
14+
LinearAlgebra = "1"
1215
Random = "1"
1316
Serialization = "1"
1417
Sockets = "1"
18+
Test = "1"
1519
julia = "1.9"
1620

1721
[extras]
22+
Aqua = "4c88cf16-eb10-579e-8560-4a9242c79595"
1823
Distributed = "8ba89e20-285c-5b6f-9357-94700520ee1b"
1924
LibSSH = "00483490-30f8-4353-8aba-35b82f51f4d0"
2025
LinearAlgebra = "37e2e46d-f89d-539d-b4ee-838fcccc9c8e"
2126
Test = "8dfed614-e22c-5e08-85e1-65c5234f0b40"
2227

2328
[targets]
24-
test = ["LinearAlgebra", "Test", "LibSSH", "Distributed"]
29+
test = ["Aqua", "Distributed", "LibSSH", "LinearAlgebra", "Test"]

src/cluster.jl

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -777,22 +777,17 @@ function redirect_output_from_additional_worker(pid, port)
777777
end
778778

779779
function check_master_connect()
780-
timeout = worker_timeout() * 1e9
781780
# If we do not have at least process 1 connect to us within timeout
782781
# we log an error and exit, unless we're running on valgrind
783782
if ccall(:jl_running_on_valgrind,Cint,()) != 0
784783
return
785784
end
786785

787786
errormonitor(
788-
@async begin
789-
start = time_ns()
790-
while !haskey(map_pid_wrkr, 1) && (time_ns() - start) < timeout
791-
sleep(1.0)
792-
end
793-
794-
if !haskey(map_pid_wrkr, 1)
795-
print(stderr, "Master process (id 1) could not connect within $(timeout/1e9) seconds.\nexiting.\n")
787+
Threads.@spawn begin
788+
timeout = worker_timeout()
789+
if timedwait(() -> haskey(map_pid_wrkr, 1), timeout) === :timed_out
790+
print(stderr, "Master process (id 1) could not connect within $(timeout) seconds.\nexiting.\n")
796791
exit(1)
797792
end
798793
end

src/clusterserialize.jl

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -167,10 +167,21 @@ function deserialize_global_from_main(s::ClusterSerializer, sym)
167167
return nothing
168168
end
169169
end
170-
Core.eval(Main, Expr(:global, sym))
170+
171171
if sym_isconst
172-
ccall(:jl_set_const, Cvoid, (Any, Any, Any), Main, sym, v)
172+
@static if VERSION >= v"1.12"
173+
# Note that the post-lowering const form is not allowed in value
174+
# position, so there needs to be a dummy `nothing` argument to drop the
175+
# return value.
176+
Core.eval(Main, Expr(:block,
177+
Expr(:const, GlobalRef(Main, sym), v),
178+
nothing))
179+
else
180+
Core.eval(Main, Expr(:global, sym))
181+
ccall(:jl_set_const, Cvoid, (Any, Any, Any), Main, sym, v)
182+
end
173183
else
184+
Core.eval(Main, Expr(:global, sym))
174185
invokelatest(setglobal!, Main, sym, v)
175186
end
176187
return nothing

src/managers.jl

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,7 @@ addprocs([
126126
127127
* `exeflags`: additional flags passed to the worker processes. It can either be a `Cmd`, a `String`
128128
holding one flag, or a collection of strings, with one element per flag.
129-
E.g. `\`--threads=auto project=.\``, `"--compile-trace=stderr"` or `["--threads=auto", "--compile=all"]`.
129+
E.g. `\`--threads=auto project=.\``, `"--compile-trace=stderr"` or `["--threads=auto", "--compile=all"]`.
130130
131131
* `topology`: Specifies how the workers connect to each other. Sending a message between
132132
unconnected workers results in an error.
@@ -767,7 +767,8 @@ function kill(manager::SSHManager, pid::Int, config::WorkerConfig)
767767
nothing
768768
end
769769

770-
function kill(manager::LocalManager, pid::Int, config::WorkerConfig; exit_timeout = 15, term_timeout = 15)
770+
function kill(manager::LocalManager, pid::Int, config::WorkerConfig; profile_wait = 6, exit_timeout = 15, term_timeout = 15)
771+
# profile_wait = 6 is 1s for profile, 5s for the report to show
771772
# First, try sending `exit()` to the remote over the usual control channels
772773
remote_do(exit, pid)
773774

@@ -776,7 +777,14 @@ function kill(manager::LocalManager, pid::Int, config::WorkerConfig; exit_timeou
776777

777778
# Check to see if our child exited, and if not, send an actual kill signal
778779
if !process_exited(config.process)
779-
@warn("Failed to gracefully kill worker $(pid), sending SIGQUIT")
780+
@warn "Failed to gracefully kill worker $(pid)"
781+
profile_sig = Sys.iswindows() ? nothing : Sys.isbsd() ? ("SIGINFO", 29) : ("SIGUSR1" , 10)
782+
if profile_sig !== nothing
783+
@warn("Sending profile $(profile_sig[1]) to worker $(pid)")
784+
kill(config.process, profile_sig[2])
785+
sleep(profile_wait)
786+
end
787+
@warn("Sending SIGQUIT to worker $(pid)")
780788
kill(config.process, Base.SIGQUIT)
781789

782790
sleep(term_timeout)

src/workerpool.jl

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,7 @@ function remotecall_pool(rc_f::typeof(remotecall), f, pool::AbstractWorkerPool,
149149

150150
t = Threads.@spawn try
151151
wait(x)
152+
catch # just wait, ignore errors here
152153
finally
153154
put!(pool, worker)
154155
end
@@ -400,3 +401,29 @@ function remotecall_pool(rc_f, f, pool::CachingPool, args...; kwargs...)
400401
put!(pool, worker)
401402
end
402403
end
404+
405+
# Specialization for remotecall. We have to wait for the Future it returns
406+
# before putting the worker back in the pool.
407+
function remotecall_pool(rc_f::typeof(remotecall), f, pool::CachingPool, args...; kwargs...)
408+
worker = take!(pool)
409+
f_ref = get(pool.map_obj2ref, (worker, f), (f, RemoteChannel(worker)))
410+
isa(f_ref, Tuple) && (pool.map_obj2ref[(worker, f)] = f_ref[2]) # Add to tracker
411+
412+
local x
413+
try
414+
x = rc_f(exec_from_cache, worker, f_ref, args...; kwargs...)
415+
catch
416+
put!(pool, worker)
417+
rethrow()
418+
end
419+
420+
t = Threads.@spawn Threads.threadpool() try
421+
wait(x)
422+
catch # just wait, ignore errors here
423+
finally
424+
put!(pool, worker)
425+
end
426+
errormonitor(t)
427+
428+
return x
429+
end

test/distributed_exec.jl

Lines changed: 30 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -243,7 +243,7 @@ end
243243
end
244244
end
245245
end
246-
@test testval == 1
246+
@test timedwait(() -> testval == 1, 10) == :ok
247247

248248
# Issue number #25847
249249
@everywhere function f25847(ref)
@@ -722,6 +722,8 @@ end
722722
@test nworkers() == length(unique(remotecall_fetch(wp->pmap(_->myid(), wp, 1:100), id_other, wp)))
723723
wp = WorkerPool(2:3)
724724
@test sort(unique(pmap(_->myid(), wp, 1:100))) == [2,3]
725+
@test fetch(remotecall(myid, wp)) in wp.workers
726+
@test_throws RemoteException fetch(remotecall(error, wp))
725727

726728
# wait on worker pool
727729
wp = WorkerPool(2:2)
@@ -747,6 +749,8 @@ end
747749
# CachingPool tests
748750
wp = CachingPool(workers())
749751
@test [1:100...] == pmap(x->x, wp, 1:100)
752+
@test fetch(remotecall(myid, wp)) in wp.workers
753+
@test_throws RemoteException fetch(remotecall(error, wp))
750754

751755
clear!(wp)
752756
@test length(wp.map_obj2ref) == 0
@@ -1017,15 +1021,19 @@ f16091b = () -> 1
10171021
# Test the behaviour of remotecall(f, ::AbstractWorkerPool), this should
10181022
# keep the worker out of the pool until the underlying remotecall has
10191023
# finished.
1020-
remotechan = RemoteChannel(wrkr1)
1021-
pool = WorkerPool([wrkr1])
1022-
put_future = remotecall(() -> wait(remotechan), pool)
1023-
@test !isready(pool)
1024-
put!(remotechan, 1)
1025-
wait(put_future)
1026-
# The task that waits on the future to put it back into the pool runs
1027-
# asynchronously so we use timedwait() to check when the worker is back in.
1028-
@test timedwait(() -> isready(pool), 10) === :ok
1024+
for PoolType in (WorkerPool, CachingPool)
1025+
let
1026+
remotechan = RemoteChannel(wrkr1)
1027+
pool = PoolType([wrkr1])
1028+
put_future = remotecall(() -> wait(remotechan), pool)
1029+
@test !isready(pool)
1030+
put!(remotechan, 1)
1031+
wait(put_future)
1032+
# The task that waits on the future to put it back into the pool runs
1033+
# asynchronously so we use timedwait() to check when the worker is back in.
1034+
@test timedwait(() -> isready(pool), 10) === :ok
1035+
end
1036+
end
10291037

10301038
# Test calling @everywhere from a module not defined on the workers
10311039
LocalBar.bar()
@@ -1707,18 +1715,17 @@ end
17071715
end
17081716

17091717
# Ensure that the code has indeed been successfully executed everywhere
1710-
@test all(in(results), procs())
1718+
return all(in(results), procs())
17111719
end
17121720

17131721
# Test that the client port is reused. SO_REUSEPORT may not be supported on
17141722
# all UNIX platforms, Linux kernels prior to 3.9 and older versions of OSX
17151723
@assert nprocs() == 1
17161724
addprocs_with_testenv(4; lazy=false)
1717-
if ccall(:jl_has_so_reuseport, Int32, ()) == 1
1718-
reuseport_tests()
1719-
else
1720-
@info "SO_REUSEPORT is unsupported, skipping reuseport tests"
1721-
end
1725+
1726+
skip_reuseexport = ccall(:jl_has_so_reuseport, Int32, ()) != 1
1727+
skip_reuseexport && @debug "SO_REUSEPORT support missing, reuseport_tests skipped"
1728+
@test reuseport_tests() skip = skip_reuseexport
17221729
end
17231730

17241731
@testset "Even more various individual issues" begin
@@ -1848,19 +1855,19 @@ end
18481855
end
18491856
"""
18501857
cmd = setenv(`$(julia) --project=$(project) -e $(testcode * extracode)`, env)
1851-
@test success(cmd)
1858+
@test success(pipeline(cmd; stdout, stderr))
18521859
# JULIA_PROJECT
18531860
cmd = setenv(`$(julia) -e $(testcode * extracode)`,
18541861
(env["JULIA_PROJECT"] = project; env))
1855-
@test success(cmd)
1862+
@test success(pipeline(cmd; stdout, stderr))
18561863
# Pkg.activate(...)
18571864
activateish = """
18581865
Base.ACTIVE_PROJECT[] = $(repr(project))
18591866
using DistributedNext
18601867
addprocs(1)
18611868
"""
18621869
cmd = setenv(`$(julia) -e $(activateish * testcode * extracode)`, env)
1863-
@test success(cmd)
1870+
@test success(pipeline(cmd; stdout, stderr))
18641871
# JULIA_(LOAD|DEPOT)_PATH
18651872
shufflecode = """
18661873
d = reverse(DEPOT_PATH)
@@ -1879,7 +1886,7 @@ end
18791886
end
18801887
"""
18811888
cmd = setenv(`$(julia) -e $(shufflecode * addcode * testcode * extracode)`, env)
1882-
@test success(cmd)
1889+
@test success(pipeline(cmd; stdout, stderr))
18831890
# Mismatch when shuffling after proc addition. Note that the use of
18841891
# `addcode` mimics the behaviour of -p1 as the first worker is started
18851892
# before `shufflecode` executes.
@@ -1891,7 +1898,7 @@ end
18911898
end
18921899
"""
18931900
cmd = setenv(`$(julia) -e $(failcode)`, env)
1894-
@test success(cmd)
1901+
@test success(pipeline(cmd; stdout, stderr))
18951902

18961903
# Hideous hack to double escape path separators on Windows so that it gets
18971904
# interpolated into the string (and then Cmd) correctly.
@@ -1918,7 +1925,7 @@ end
19181925
end
19191926
"""
19201927
cmd = setenv(`$(julia) -e $(envcode)`, env)
1921-
@test success(cmd)
1928+
@test success(pipeline(cmd; stdout, stderr))
19221929
end end
19231930
end
19241931

@@ -1935,7 +1942,7 @@ include("splitrange.jl")
19351942

19361943
# Next, ensure we get a log message when a worker does not cleanly exit
19371944
w = only(addprocs(1))
1938-
@test_logs (:warn, r"sending SIGQUIT") begin
1945+
@test_logs (:warn, r"Sending SIGQUIT") match_mode=:any begin
19391946
remote_do(w) do
19401947
# Cause the 'exit()' message that `rmprocs()` sends to do nothing
19411948
Core.eval(Base, :(exit() = nothing))

test/runtests.jl

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
# This file is a part of Julia. License is MIT: https://julialang.org/license
22

33
using Test
4+
import DistributedNext
5+
import Aqua
46

57
# Run the distributed test outside of the main driver since it needs its own
68
# set of dedicated workers.
@@ -22,3 +24,7 @@ include("distributed_exec.jl")
2224
include("managers.jl")
2325

2426
include("distributed_stdlib_detection.jl")
27+
28+
@testset "Aqua" begin
29+
Aqua.test_all(DistributedNext)
30+
end

0 commit comments

Comments
 (0)