Skip to content

Commit 76cb6b6

Browse files
authored
Merge pull request #33 from JuliaParallel/dpa/jet
CI: Add a CI job that runs the JET tests, and fix multiple JET errors
2 parents 801c564 + 8c2de2a commit 76cb6b6

File tree

10 files changed

+93
-28
lines changed

10 files changed

+93
-28
lines changed

.github/workflows/ci.yml

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,9 @@ concurrency:
1515
group: ${{ github.workflow }}-${{ github.ref }}-${{ github.ref != 'refs/heads/main' || startsWith(github.ref, 'refs/heads/release-') || github.run_number }}
1616
cancel-in-progress: ${{ startsWith(github.ref, 'refs/pull/') }}
1717

18+
permissions:
19+
contents: read
20+
1821
jobs:
1922
test:
2023
runs-on: ${{ matrix.os }}
@@ -51,6 +54,8 @@ jobs:
5154

5255
steps:
5356
- uses: actions/checkout@v6
57+
with:
58+
persist-credentials: false
5459
- uses: julia-actions/setup-julia@v2
5560
with:
5661
version: ${{ matrix.version }}
@@ -102,6 +107,8 @@ jobs:
102107
runs-on: ubuntu-latest
103108
steps:
104109
- uses: actions/checkout@v6
110+
with:
111+
persist-credentials: false
105112
- uses: julia-actions/setup-julia@latest
106113
with:
107114
version: '1'
@@ -111,3 +118,17 @@ jobs:
111118
env:
112119
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
113120
run: julia --project=docs/ docs/make.jl
121+
122+
jet:
123+
runs-on: ubuntu-latest
124+
steps:
125+
- uses: actions/checkout@v6
126+
with:
127+
persist-credentials: false
128+
- uses: julia-actions/setup-julia@latest
129+
with:
130+
version: '1.12'
131+
# version: 'nightly'
132+
- run: julia --color=yes --project=ci/jet -e 'import Pkg; Pkg.instantiate()'
133+
- name: Run the JET tests
134+
run: julia --color=yes --project=ci/jet ci/jet/check.jl

ci/jet/Project.toml

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
[deps]
2+
DistributedNext = "fab6aee4-877b-4bac-a744-3eca44acbb6f"
3+
JET = "c3a54625-cd67-489e-a8e7-0a5a0ff4e31b"
4+
Serialization = "9e88b42a-f829-5b0c-bbe9-9e923198166b"
5+
Test = "8dfed614-e22c-5e08-85e1-65c5234f0b40"
6+
7+
[sources]
8+
DistributedNext = {path = "../.."}
9+
10+
[compat]
11+
JET = "0.11"

ci/jet/check.jl

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
using DistributedNext: DistributedNext
2+
3+
using JET: JET
4+
using Serialization: Serialization
5+
using Test: Test, @testset
6+
7+
# We don't want to fail PkgEval because of a JET failure
8+
# Therefore, we don't put the JET tests in the regular DistributedNext test suite
9+
# Instead, we put it in a separate CI job, which runs on the DistributedNext repo
10+
11+
@testset "JET" begin
12+
ignored_modules = (
13+
# We will ignore Base:
14+
Base,
15+
16+
# We'll ignore the Serialization stdlib:
17+
Serialization,
18+
)
19+
JET.test_package(DistributedNext; ignored_modules)
20+
end

src/cluster.jl

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,14 @@ Cluster managers implement how workers can be added, removed and communicated wi
99
"""
1010
abstract type ClusterManager end
1111

12+
# cluster_manager is a global constant
13+
const cluster_manager = Ref{ClusterManager}()
14+
15+
function throw_if_cluster_manager_unassigned()
16+
isassigned(cluster_manager) || error("cluster_manager is unassigned")
17+
return nothing
18+
end
19+
1220
"""
1321
WorkerConfig
1422
@@ -390,8 +398,7 @@ function init_worker(cookie::AbstractString, manager::ClusterManager=DefaultClus
390398

391399
# On workers, the default cluster manager connects via TCP sockets. Custom
392400
# transports will need to call this function with their own manager.
393-
global cluster_manager
394-
cluster_manager = manager
401+
cluster_manager[] = manager
395402

396403
# Since our pid has yet to be set, ensure no RemoteChannel / Future have been created or addprocs() called.
397404
@assert nprocs() <= 1
@@ -569,7 +576,7 @@ function setup_launched_worker(manager, wconfig, launched_q)
569576
# same type. This is done by setting an appropriate value to `WorkerConfig.cnt`.
570577
cnt = something(wconfig.count, 1)
571578
if cnt === :auto
572-
cnt = wconfig.environ[:cpu_threads]
579+
cnt = (wconfig.environ::AbstractDict)[:cpu_threads]
573580
end
574581
cnt = cnt - 1 # Removing self from the requested number
575582

@@ -607,7 +614,7 @@ function launch_n_additional_processes(manager, frompid, fromconfig, cnt, launch
607614
end
608615
end
609616

610-
function create_worker(manager, wconfig)
617+
function create_worker(manager::ClusterManager, wconfig::WorkerConfig)
611618
# only node 1 can add new nodes, since nobody else has the full list of address:port
612619
@assert LPROC.id == 1
613620
timeout = worker_timeout()

src/macros.jl

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -333,6 +333,7 @@ completion. To wait for completion, prefix the call with [`@sync`](@ref), like :
333333
macro distributed(args...)
334334
na = length(args)
335335
if na==1
336+
reducer = identity
336337
loop = args[1]
337338
elseif na==2
338339
reducer = args[1]

src/managers.jl

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -347,7 +347,7 @@ function launch_on_machine(manager::SSHManager, machine::AbstractString, cnt, pa
347347

348348
any(c -> c == '"', exename) && throw(ArgumentError("invalid exename"))
349349

350-
remotecmd = shell_escape_wincmd(escape_microsoft_c_args(exename, exeflags...))
350+
remotecmd = shell_escape_wincmd(escape_microsoft_c_args(exename, exeflags...)::AbstractString)
351351
# change working directory
352352
if dir !== nothing && dir != ""
353353
any(c -> c == '"', dir) && throw(ArgumentError("invalid dir"))
@@ -553,7 +553,7 @@ end
553553

554554
function manage(manager::LocalManager, id::Integer, config::WorkerConfig, op::Symbol)
555555
if op === :interrupt
556-
kill(config.process, 2)
556+
kill(config.process::Process, 2)
557557
end
558558
end
559559

@@ -606,7 +606,7 @@ function connect(manager::ClusterManager, pid::Int, config::WorkerConfig)
606606

607607
# master connecting to workers
608608
if config.io !== nothing
609-
(bind_addr, port::Int) = read_worker_host_port(config.io)
609+
(bind_addr, port::Int) = read_worker_host_port(config.io::IO)
610610
pubhost = something(config.host, bind_addr)
611611
config.host = pubhost
612612
config.port = port
@@ -776,21 +776,22 @@ function kill(manager::LocalManager, pid::Int, config::WorkerConfig; profile_wai
776776
sleep(exit_timeout)
777777

778778
# Check to see if our child exited, and if not, send an actual kill signal
779-
if !process_exited(config.process)
779+
process = config.process::Process
780+
if !process_exited(process)
780781
@warn "Failed to gracefully kill worker $(pid)"
781782
profile_sig = Sys.iswindows() ? nothing : Sys.isbsd() ? ("SIGINFO", 29) : ("SIGUSR1" , 10)
782783
if profile_sig !== nothing
783784
@warn("Sending profile $(profile_sig[1]) to worker $(pid)")
784-
kill(config.process, profile_sig[2])
785+
kill(process, profile_sig[2])
785786
sleep(profile_wait)
786787
end
787788
@warn("Sending SIGQUIT to worker $(pid)")
788-
kill(config.process, Base.SIGQUIT)
789+
kill(process, Base.SIGQUIT)
789790

790791
sleep(term_timeout)
791-
if !process_exited(config.process)
792+
if !process_exited(process)
792793
@warn("Worker $(pid) ignored SIGQUIT, sending SIGKILL")
793-
kill(config.process, Base.SIGKILL)
794+
kill(process, Base.SIGKILL)
794795
end
795796
end
796797
end

src/messages.jl

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -102,15 +102,15 @@ end
102102
function send_msg(s::IO, header, msg)
103103
id = worker_id_from_socket(s)
104104
if id > -1
105-
return send_msg(worker_from_id(id), header, msg)
105+
return send_msg(worker_from_id(id)::Worker, header, msg)
106106
end
107107
send_msg_unknown(s, header, msg)
108108
end
109109

110110
function send_msg_now(s::IO, header, msg::AbstractMsg)
111111
id = worker_id_from_socket(s)
112112
if id > -1
113-
return send_msg_now(worker_from_id(id), header, msg)
113+
return send_msg_now(worker_from_id(id)::Worker, header, msg)
114114
end
115115
send_msg_unknown(s, header, msg)
116116
end

src/process_messages.jl

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ function run_work_thunk(thunk::Function, print_error::Bool)
7575
end
7676
return result
7777
end
78-
function run_work_thunk(rv::RemoteValue, thunk)
78+
function run_work_thunk_remotevalue(rv::RemoteValue, thunk)
7979
put!(rv, run_work_thunk(thunk, false))
8080
nothing
8181
end
@@ -85,7 +85,7 @@ function schedule_call(rid, thunk)
8585
rv = RemoteValue(def_rv_channel())
8686
(PGRP::ProcessGroup).refs[rid] = rv
8787
push!(rv.clientset, rid.whence)
88-
errormonitor(@async run_work_thunk(rv, thunk))
88+
errormonitor(@async run_work_thunk_remotevalue(rv, thunk))
8989
return rv
9090
end
9191
end
@@ -289,7 +289,7 @@ function handle_msg(msg::CallMsg{:call_fetch}, header, r_stream, w_stream, versi
289289
try
290290
deliver_result(w_stream, :call_fetch, header.notify_oid, v.v)
291291
finally
292-
unlock(v.rv.synctake)
292+
unlock(v.rv.synctake::ReentrantLock)
293293
end
294294
else
295295
deliver_result(w_stream, :call_fetch, header.notify_oid, v)
@@ -315,8 +315,10 @@ function handle_msg(msg::ResultMsg, header, r_stream, w_stream, version)
315315
end
316316

317317
function handle_msg(msg::IdentifySocketMsg, header, r_stream, w_stream, version)
318+
throw_if_cluster_manager_unassigned()
319+
318320
# register a new peer worker connection
319-
w = Worker(msg.from_pid, r_stream, w_stream, cluster_manager; version=version)
321+
w = Worker(msg.from_pid, r_stream, w_stream, cluster_manager[]; version=version)::Worker
320322
send_connection_hdr(w, false)
321323
send_msg_now(w, MsgHeader(), IdentifySocketAckMsg())
322324
notify(w.initialized)
@@ -328,8 +330,10 @@ function handle_msg(msg::IdentifySocketAckMsg, header, r_stream, w_stream, versi
328330
end
329331

330332
function handle_msg(msg::JoinPGRPMsg, header, r_stream, w_stream, version)
333+
throw_if_cluster_manager_unassigned()
334+
331335
LPROC.id = msg.self_pid
332-
controller = Worker(1, r_stream, w_stream, cluster_manager; version=version)
336+
controller = Worker(1, r_stream, w_stream, cluster_manager[]; version=version)::Worker
333337
notify(controller.initialized)
334338
register_worker(LPROC)
335339
topology(msg.topology)
@@ -348,9 +352,9 @@ function handle_msg(msg::JoinPGRPMsg, header, r_stream, w_stream, version)
348352
let rpid=rpid, wconfig=wconfig
349353
if lazy
350354
# The constructor registers the object with a global registry.
351-
Worker(rpid, ()->connect_to_peer(cluster_manager, rpid, wconfig))
355+
Worker(rpid, ()->connect_to_peer(cluster_manager[], rpid, wconfig))
352356
else
353-
@async connect_to_peer(cluster_manager, rpid, wconfig)
357+
@async connect_to_peer(cluster_manager[], rpid, wconfig)
354358
end
355359
end
356360
end
@@ -362,7 +366,7 @@ end
362366
function connect_to_peer(manager::ClusterManager, rpid::Int, wconfig::WorkerConfig)
363367
try
364368
(r_s, w_s) = connect(manager, rpid, wconfig)
365-
w = Worker(rpid, r_s, w_s, manager; config=wconfig)
369+
w = Worker(rpid, r_s, w_s, manager; config=wconfig)::Worker
366370
process_messages(w.r_stream, w.w_stream, false)
367371
send_connection_hdr(w, true)
368372
send_msg_now(w, MsgHeader(), IdentifySocketMsg(myid()))

src/remotecall.jl

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -706,8 +706,8 @@ function put_ref(rid, caller, args...)
706706
put!(rv, args...)
707707
if myid() == caller && rv.synctake !== nothing
708708
# Wait till a "taken" value is serialized out - github issue #29932
709-
lock(rv.synctake)
710-
unlock(rv.synctake)
709+
lock(rv.synctake::ReentrantLock)
710+
unlock(rv.synctake::ReentrantLock)
711711
end
712712
nothing
713713
end
@@ -731,15 +731,15 @@ function take_ref(rid, caller, args...)
731731
# special handling for local put! / remote take! on unbuffered channel
732732
# github issue #29932
733733
synctake = true
734-
lock(rv.synctake)
734+
lock(rv.synctake::ReentrantLock)
735735
end
736736

737737
v = try
738738
take!(rv, args...)
739739
catch e
740740
# avoid unmatched unlock when exception occurs
741741
# github issue #33972
742-
synctake && unlock(rv.synctake)
742+
synctake && unlock(rv.synctake::ReentrantLock)
743743
rethrow(e)
744744
end
745745

src/workerpool.jl

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -289,7 +289,7 @@ julia> default_worker_pool()
289289
WorkerPool(Channel{Int64}(sz_max:9223372036854775807,sz_curr:3), Set([4, 2, 3]), RemoteChannel{Channel{Any}}(1, 1, 4))
290290
```
291291
"""
292-
function default_worker_pool()
292+
function default_worker_pool()::AbstractWorkerPool
293293
# On workers retrieve the default worker pool from the master when accessed
294294
# for the first time
295295
if _default_worker_pool[] === nothing
@@ -299,7 +299,7 @@ function default_worker_pool()
299299
_default_worker_pool[] = remotecall_fetch(()->default_worker_pool(), 1)
300300
end
301301
end
302-
return _default_worker_pool[]
302+
return _default_worker_pool[]::AbstractWorkerPool
303303
end
304304

305305
"""

0 commit comments

Comments
 (0)