Skip to content

Commit 02b3f81

Browse files
IanButterworthjpsamarootkfvchuravy
authored andcommitted
Add a :dynamic scheduling option for Threads.@threads (JuliaLang#43919)
Co-authored-by: Julian Samaroo <[email protected]> Co-authored-by: Takafumi Arakaki <[email protected]> Co-authored-by: Valentin Churavy <[email protected]>
1 parent b174ed8 commit 02b3f81

File tree

3 files changed

+132
-17
lines changed

3 files changed

+132
-17
lines changed

NEWS.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,9 @@ Command-line option changes
7373
Multi-threading changes
7474
-----------------------
7575

76+
* A new `:dynamic` schedule option for `Threads.@threads` which is similar to the default behavior except iterations
77+
will be scheduled dynamically to available worker threads rather than pinned to each thread. This option is more
78+
composable with (possibly nested) `@spawn` and `@threads` loops ([#43919])
7679

7780
Build system changes
7881
--------------------

base/threadingconstructs.jl

Lines changed: 72 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -22,14 +22,14 @@ See also: `BLAS.get_num_threads` and `BLAS.set_num_threads` in the
2222
"""
2323
nthreads() = Int(unsafe_load(cglobal(:jl_n_threads, Cint)))
2424

25-
function threading_run(func)
25+
function threading_run(fun, static)
2626
ccall(:jl_enter_threaded_region, Cvoid, ())
2727
n = nthreads()
2828
tasks = Vector{Task}(undef, n)
2929
for i = 1:n
30-
t = Task(func)
31-
t.sticky = true
32-
ccall(:jl_set_task_tid, Cint, (Any, Cint), t, i-1)
30+
t = Task(() -> fun(i)) # pass in tid
31+
t.sticky = static
32+
static && ccall(:jl_set_task_tid, Cint, (Any, Cint), t, i-1)
3333
tasks[i] = t
3434
schedule(t)
3535
end
@@ -48,15 +48,14 @@ function _threadsfor(iter, lbody, schedule)
4848
quote
4949
local threadsfor_fun
5050
let range = $(esc(range))
51-
function threadsfor_fun(onethread=false)
51+
function threadsfor_fun(tid=1; onethread=false)
5252
r = range # Load into local variable
5353
lenr = length(r)
5454
# divide loop iterations among threads
5555
if onethread
5656
tid = 1
5757
len, rem = lenr, 0
5858
else
59-
tid = threadid()
6059
len, rem = divrem(lenr, nthreads())
6160
end
6261
# not enough iterations for all the threads?
@@ -86,15 +85,17 @@ function _threadsfor(iter, lbody, schedule)
8685
end
8786
end
8887
end
89-
if ccall(:jl_in_threaded_region, Cint, ()) != 0
88+
if $(schedule === :dynamic)
89+
threading_run(threadsfor_fun, false)
90+
elseif ccall(:jl_in_threaded_region, Cint, ()) != 0
9091
$(if schedule === :static
9192
:(error("`@threads :static` cannot be used concurrently or nested"))
9293
else
9394
# only use threads when called from outside @threads
94-
:(threadsfor_fun(true))
95+
:(threadsfor_fun(onethread = true))
9596
end)
9697
else
97-
threading_run(threadsfor_fun)
98+
threading_run(threadsfor_fun, true)
9899
end
99100
nothing
100101
end
@@ -110,15 +111,73 @@ A barrier is placed at the end of the loop which waits for all tasks to finish
110111
execution.
111112
112113
The `schedule` argument can be used to request a particular scheduling policy.
113-
The only currently supported value is `:static`, which creates one task per thread
114-
and divides the iterations equally among them. Specifying `:static` is an error
115-
if used from inside another `@threads` loop or from a thread other than 1.
114+
115+
Except for `:static` scheduling, how the iterations are assigned to tasks, and how the tasks
116+
are assigned to the worker threads is undefined. The exact assignments can be different
117+
for each execution. The scheduling option is a hint. The loop body code (including any code
118+
transitively called from it) must not make assumptions about the distribution of iterations
119+
to tasks or the worker thread in which they are executed. The loop body for each iteration
120+
must be able to make forward progress independent of other iterations and be free from data
121+
races. As such, synchronizations across iterations may deadlock.
122+
123+
For example, the above conditions imply that:
124+
125+
- The lock taken in an iteration *must* be released within the same iteration.
126+
- Communicating between iterations using blocking primitives like `Channel`s is incorrect.
127+
- Write only to locations not shared across iterations (unless a lock or atomic operation is used).
128+
129+
130+
Schedule options are:
131+
- `:static` creates one task per thread and divides the iterations equally among
132+
them, assigning each task specifically to each thread.
133+
Specifying `:static` is an error if used from inside another `@threads` loop
134+
or from a thread other than 1.
135+
- `:dynamic` will schedule iterations dynamically to available worker threads,
136+
assuming that the workload for each iteration is uniform.
137+
138+
Without the scheduler argument, the exact scheduling is unspecified; i.e. it may be
139+
different across Julia releases. Currently, the behavior is dependent on the calling thread.
140+
The default is `:static` when called from thread 1. The loop will be executed without threading
141+
when called from other threads.
116142
117143
The default schedule (used when no `schedule` argument is present) is subject to change.
118144
145+
For example, an illustration of the different scheduling strategies where `busywait`
146+
is a non-yielding timed loop that runs for a number of seconds.
147+
148+
```julia-repl
149+
julia> function busywait(seconds)
150+
tstart = time_ns()
151+
while (time_ns() - tstart) / 1e9 < seconds
152+
end
153+
end
154+
155+
julia> @time begin
156+
Threads.@spawn busywait(5)
157+
Threads.@threads :static for i in 1:Threads.nthreads()
158+
busywait(1)
159+
end
160+
end
161+
6.003001 seconds (16.33 k allocations: 899.255 KiB, 0.25% compilation time)
162+
163+
julia> @time begin
164+
Threads.@spawn busywait(5)
165+
Threads.@threads :dynamic for i in 1:Threads.nthreads()
166+
busywait(1)
167+
end
168+
end
169+
2.012056 seconds (16.05 k allocations: 883.919 KiB, 0.66% compilation time)
170+
```
171+
172+
The `:dynamic` example takes 2 seconds since one of the non-occupied threads is able
173+
to run two of the 1-second iterations to complete the for loop.
174+
119175
!!! compat "Julia 1.5"
120176
The `schedule` argument is available as of Julia 1.5.
121177
178+
!!! compat "Julia 1.8"
179+
The `:dynamic` option for the `schedule` argument is available as of Julia 1.8.
180+
122181
See also: [`@spawn`](@ref Threads.@spawn), [`nthreads()`](@ref Threads.nthreads),
123182
[`threadid()`](@ref Threads.threadid), `pmap` in [`Distributed`](@ref man-distributed),
124183
`BLAS.set_num_threads` in [`LinearAlgebra`](@ref man-linalg).
@@ -133,7 +192,7 @@ macro threads(args...)
133192
# for now only allow quoted symbols
134193
sched = nothing
135194
end
136-
if sched !== :static
195+
if sched !== :static && sched !== :dynamic
137196
throw(ArgumentError("unsupported schedule argument in @threads"))
138197
end
139198
elseif na == 1

test/threads_exec.jl

Lines changed: 57 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -722,15 +722,68 @@ let a = zeros(nthreads())
722722
end
723723

724724
# static schedule
725-
function _atthreads_static_schedule()
725+
function _atthreads_static_schedule(n)
726+
ids = zeros(Int, n)
727+
Threads.@threads :static for i = 1:n
728+
ids[i] = Threads.threadid()
729+
end
730+
return ids
731+
end
732+
@test _atthreads_static_schedule(nthreads()) == 1:nthreads()
733+
@test _atthreads_static_schedule(1) == [1;]
734+
@test_throws(
735+
"`@threads :static` cannot be used concurrently or nested",
736+
@threads(for i = 1:1; _atthreads_static_schedule(nthreads()); end),
737+
)
738+
739+
# dynamic schedule
740+
function _atthreads_dynamic_schedule(n)
741+
inc = Threads.Atomic{Int}(0)
742+
flags = zeros(Int, n)
743+
Threads.@threads :dynamic for i = 1:n
744+
Threads.atomic_add!(inc, 1)
745+
flags[i] = 1
746+
end
747+
return inc[], flags
748+
end
749+
@test _atthreads_dynamic_schedule(nthreads()) == (nthreads(), ones(nthreads()))
750+
@test _atthreads_dynamic_schedule(1) == (1, ones(1))
751+
@test _atthreads_dynamic_schedule(10) == (10, ones(10))
752+
@test _atthreads_dynamic_schedule(nthreads() * 2) == (nthreads() * 2, ones(nthreads() * 2))
753+
754+
# nested dynamic schedule
755+
function _atthreads_dynamic_dynamic_schedule()
756+
inc = Threads.Atomic{Int}(0)
757+
Threads.@threads :dynamic for _ = 1:nthreads()
758+
Threads.@threads :dynamic for _ = 1:nthreads()
759+
Threads.atomic_add!(inc, 1)
760+
end
761+
end
762+
return inc[]
763+
end
764+
@test _atthreads_dynamic_dynamic_schedule() == nthreads() * nthreads()
765+
766+
function _atthreads_static_dynamic_schedule()
726767
ids = zeros(Int, nthreads())
768+
inc = Threads.Atomic{Int}(0)
727769
Threads.@threads :static for i = 1:nthreads()
728770
ids[i] = Threads.threadid()
771+
Threads.@threads :dynamic for _ = 1:nthreads()
772+
Threads.atomic_add!(inc, 1)
773+
end
729774
end
730-
return ids
775+
return ids, inc[]
776+
end
777+
@test _atthreads_static_dynamic_schedule() == (1:nthreads(), nthreads() * nthreads())
778+
779+
# errors inside @threads :dynamic
780+
function _atthreads_dynamic_with_error(a)
781+
Threads.@threads :dynamic for i in eachindex(a)
782+
error("user error in the loop body")
783+
end
784+
a
731785
end
732-
@test _atthreads_static_schedule() == [1:nthreads();]
733-
@test_throws TaskFailedException @threads for i = 1:1; _atthreads_static_schedule(); end
786+
@test_throws "user error in the loop body" _atthreads_dynamic_with_error(zeros(nthreads()))
734787

735788
try
736789
@macroexpand @threads(for i = 1:10, j = 1:10; end)

0 commit comments

Comments
 (0)