Skip to content

Commit faafb78

Browse files
authored
add threads static to aid thread API evolution (#35646)
1 parent 5e25ed9 commit faafb78

File tree

4 files changed

+63
-21
lines changed

4 files changed

+63
-21
lines changed

NEWS.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,9 @@ Command-line option changes
104104

105105
Multi-threading changes
106106
-----------------------
107+
* `@threads` now allows an optional schedule argument. Use `@threads :static ...` to
108+
ensure that the same schedule will be used as in past versions; the default schedule
109+
is likely to change in the future.
107110

108111

109112
Build system changes

base/threadingconstructs.jl

Lines changed: 44 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ function threading_run(func)
3838
end
3939
end
4040

41-
function _threadsfor(iter,lbody)
41+
function _threadsfor(iter, lbody, schedule)
4242
lidx = iter.args[1] # index
4343
range = iter.args[2]
4444
quote
@@ -82,9 +82,13 @@ function _threadsfor(iter,lbody)
8282
end
8383
end
8484
end
85-
if threadid() != 1
86-
# only thread 1 can enter/exit _threadedregion
87-
Base.invokelatest(threadsfor_fun, true)
85+
if threadid() != 1 || ccall(:jl_in_threaded_region, Cint, ()) != 0
86+
$(if schedule === :static
87+
:(error("`@threads :static` can only be used from thread 1 and not nested"))
88+
else
89+
# only use threads when called from thread 1, outside @threads
90+
:(Base.invokelatest(threadsfor_fun, true))
91+
end)
8892
else
8993
threading_run(threadsfor_fun)
9094
end
@@ -93,31 +97,50 @@ function _threadsfor(iter,lbody)
9397
end
9498

9599
"""
96-
Threads.@threads
100+
Threads.@threads [schedule] for ... end
97101
98-
A macro to parallelize a for-loop to run with multiple threads. This spawns [`nthreads()`](@ref)
99-
number of threads, splits the iteration space amongst them, and iterates in parallel.
100-
A barrier is placed at the end of the loop which waits for all the threads to finish
101-
execution, and the loop returns.
102+
A macro to parallelize a `for` loop to run with multiple threads. Splits the iteration
103+
space among multiple tasks and runs those tasks on threads according to a scheduling
104+
policy.
105+
A barrier is placed at the end of the loop which waits for all tasks to finish
106+
execution.
107+
108+
The `schedule` argument can be used to request a particular scheduling policy.
109+
The only currently supported value is `:static`, which creates one task per thread
110+
and divides the iterations equally among them. Specifying `:static` is an error
111+
if used from inside another `@threads` loop or from a thread other than 1.
112+
113+
The default schedule (used when no `schedule` argument is present) is subject to change.
114+
115+
!!! compat "Julia 1.5"
116+
The `schedule` argument is available as of Julia 1.5.
102117
"""
103118
macro threads(args...)
104119
na = length(args)
105-
if na != 1
120+
if na == 2
121+
sched, ex = args
122+
if sched isa QuoteNode
123+
sched = sched.value
124+
elseif sched isa Symbol
125+
# for now only allow quoted symbols
126+
sched = nothing
127+
end
128+
if sched !== :static
129+
throw(ArgumentError("unsupported schedule argument in @threads"))
130+
end
131+
elseif na == 1
132+
sched = :default
133+
ex = args[1]
134+
else
106135
throw(ArgumentError("wrong number of arguments in @threads"))
107136
end
108-
ex = args[1]
109-
if !isa(ex, Expr)
110-
throw(ArgumentError("need an expression argument to @threads"))
137+
if !(isa(ex, Expr) && ex.head === :for)
138+
throw(ArgumentError("@threads requires a `for` loop expression"))
111139
end
112-
if ex.head === :for
113-
if ex.args[1] isa Expr && ex.args[1].head === :(=)
114-
return _threadsfor(ex.args[1], ex.args[2])
115-
else
116-
throw(ArgumentError("nested outer loops are not currently supported by @threads"))
117-
end
118-
else
119-
throw(ArgumentError("unrecognized argument to @threads"))
140+
if !(ex.args[1] isa Expr && ex.args[1].head === :(=))
141+
throw(ArgumentError("nested outer loops are not currently supported by @threads"))
120142
end
143+
return _threadsfor(ex.args[1], ex.args[2], sched)
121144
end
122145

123146
"""

src/threading.c

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -481,6 +481,11 @@ void jl_start_threads(void)
481481

482482
unsigned volatile _threadedregion; // HACK: keep track of whether it is safe to do IO
483483

484+
JL_DLLEXPORT int jl_in_threaded_region(void)
485+
{
486+
return _threadedregion != 0;
487+
}
488+
484489
JL_DLLEXPORT void jl_enter_threaded_region(void)
485490
{
486491
_threadedregion += 1;

test/threads_exec.jl

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -729,6 +729,17 @@ let a = zeros(nthreads())
729729
@test a == [1:nthreads();]
730730
end
731731

732+
# static schedule
733+
function _atthreads_static_schedule()
734+
ids = zeros(Int, nthreads())
735+
Threads.@threads :static for i = 1:nthreads()
736+
ids[i] = Threads.threadid()
737+
end
738+
return ids
739+
end
740+
@test _atthreads_static_schedule() == [1:nthreads();]
741+
@test_throws TaskFailedException @threads for i = 1:1; _atthreads_static_schedule(); end
742+
732743
try
733744
@macroexpand @threads(for i = 1:10, j = 1:10; end)
734745
catch ex

0 commit comments

Comments
 (0)