Skip to content

Commit 52125d7

Browse files
IanButterworthKristofferC
authored andcommitted
Logging: Improve threadsafety (#57591)
Closes #57376 Closes #34037 - Adds a lock in `SimpleLogger` and `ConsoleLogger` for use on maxlog tracking and stream writes to improve threadsafety. Closely similar to #54497 - Turns the internal `_min_enabled_level` into a `Threads.Atomic`. There are [some direct interactions](https://juliahub.com/ui/Search?type=code&q=_min_enabled_level&w=true) to this internal in the ecosystem, but they should still work ``` julia> Base.CoreLogging._min_enabled_level[] = Logging.Info+1 LogLevel(1) ``` - Brings tests over from #57448 Performance seems highly similar: ``` julia> @time for i in 1:10000 @info "foo" maxlog=10000000 end [ Info: foo ... 0.481446 seconds (1.33 M allocations: 89.226 MiB, 0.49% gc time) ``` ``` 0.477235 seconds (1.31 M allocations: 79.002 MiB, 1.77% gc time) ``` (cherry picked from commit 9af9650)
1 parent 2fbe680 commit 52125d7

File tree

6 files changed

+99
-24
lines changed

6 files changed

+99
-24
lines changed

base/Base.jl

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -330,10 +330,6 @@ using .Libc: getpid, gethostname, time, memcpy, memset, memmove, memcmp
330330
const libblas_name = "libblastrampoline" * (Sys.iswindows() ? "-5" : "")
331331
const liblapack_name = libblas_name
332332

333-
# Logging
334-
include("logging.jl")
335-
using .CoreLogging
336-
337333
# Concurrency
338334
include("linked_list.jl")
339335
include("condition.jl")
@@ -345,6 +341,10 @@ include("task.jl")
345341
include("threads_overloads.jl")
346342
include("weakkeydict.jl")
347343

344+
# Logging
345+
include("logging.jl")
346+
using .CoreLogging
347+
348348
include("env.jl")
349349

350350
# functions defined in Random

base/logging.jl

Lines changed: 24 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,7 @@ isless(a::LogLevel, b::LogLevel) = isless(a.level, b.level)
131131
+(level::LogLevel, inc::Integer) = LogLevel(level.level+inc)
132132
-(level::LogLevel, inc::Integer) = LogLevel(level.level-inc)
133133
convert(::Type{LogLevel}, level::Integer) = LogLevel(level)
134+
convert(::Type{Int32}, level::LogLevel) = level.level
134135

135136
const BelowMinLevel = LogLevel(-1000001)
136137
"""
@@ -160,7 +161,8 @@ const Error = LogLevel( 2000)
160161
const AboveMaxLevel = LogLevel( 1000001)
161162

162163
# Global log limiting mechanism for super fast but inflexible global log limiting.
163-
const _min_enabled_level = Ref{LogLevel}(Debug)
164+
# Atomic ensures that the value is always consistent across threads.
165+
const _min_enabled_level = Threads.Atomic{Int32}(Debug)
164166

165167
function show(io::IO, level::LogLevel)
166168
if level == BelowMinLevel print(io, "BelowMinLevel")
@@ -383,7 +385,7 @@ function logmsg_code(_module, file, line, level, message, exs...)
383385
level = $level
384386
# simplify std_level code emitted, if we know it is one of our global constants
385387
std_level = $(level isa Symbol ? :level : :(level isa $LogLevel ? level : convert($LogLevel, level)::$LogLevel))
386-
if std_level >= $(_min_enabled_level)[]
388+
if std_level.level >= $(_min_enabled_level)[]
387389
group = $(log_data._group)
388390
_module = $(log_data._module)
389391
logger = $(current_logger_for_env)(std_level, group, _module)
@@ -526,7 +528,8 @@ end
526528
527529
Disable all log messages at log levels equal to or less than `level`. This is
528530
a *global* setting, intended to make debug logging extremely cheap when
529-
disabled.
531+
disabled. Note that this cannot be used to enable logging that is currently disabled
532+
by other mechanisms.
530533
531534
# Examples
532535
```julia
@@ -646,17 +649,21 @@ close(closed_stream)
646649
Simplistic logger for logging all messages with level greater than or equal to
647650
`min_level` to `stream`. If stream is closed then messages with log level
648651
greater or equal to `Warn` will be logged to `stderr` and below to `stdout`.
652+
653+
This Logger is thread-safe, with a lock taken around orchestration of message
654+
limits i.e. `maxlog`, and writes to the stream.
649655
"""
650656
struct SimpleLogger <: AbstractLogger
651657
stream::IO
658+
lock::ReentrantLock
652659
min_level::LogLevel
653660
message_limits::Dict{Any,Int}
654661
end
655-
SimpleLogger(stream::IO, level=Info) = SimpleLogger(stream, level, Dict{Any,Int}())
662+
SimpleLogger(stream::IO, level=Info) = SimpleLogger(stream, ReentrantLock(), level, Dict{Any,Int}())
656663
SimpleLogger(level=Info) = SimpleLogger(closed_stream, level)
657664

658665
shouldlog(logger::SimpleLogger, level, _module, group, id) =
659-
get(logger.message_limits, id, 1) > 0
666+
@lock logger.lock get(logger.message_limits, id, 1) > 0
660667

661668
min_enabled_level(logger::SimpleLogger) = logger.min_level
662669

@@ -667,15 +674,14 @@ function handle_message(logger::SimpleLogger, level::LogLevel, message, _module,
667674
@nospecialize
668675
maxlog = get(kwargs, :maxlog, nothing)
669676
if maxlog isa Core.BuiltinInts
670-
remaining = get!(logger.message_limits, id, Int(maxlog)::Int)
671-
logger.message_limits[id] = remaining - 1
672-
remaining > 0 || return
677+
@lock logger.lock begin
678+
remaining = get!(logger.message_limits, id, Int(maxlog)::Int)
679+
remaining == 0 && return
680+
logger.message_limits[id] = remaining - 1
681+
end
673682
end
674683
buf = IOBuffer()
675684
stream::IO = logger.stream
676-
if !(isopen(stream)::Bool)
677-
stream = stderr
678-
end
679685
iob = IOContext(buf, stream)
680686
levelstr = level == Warn ? "Warning" : string(level)
681687
msglines = eachsplit(chomp(convert(String, string(message))::String), '\n')
@@ -689,7 +695,13 @@ function handle_message(logger::SimpleLogger, level::LogLevel, message, _module,
689695
println(iob, "", key, " = ", val)
690696
end
691697
println(iob, "└ @ ", _module, " ", filepath, ":", line)
692-
write(stream, take!(buf))
698+
b = take!(buf)
699+
@lock logger.lock begin
700+
if !(isopen(stream)::Bool)
701+
stream = stderr
702+
end
703+
write(stream, b)
704+
end
693705
nothing
694706
end
695707

stdlib/Logging/src/ConsoleLogger.jl

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,9 @@ interactive work with the Julia REPL.
99
1010
Log levels less than `min_level` are filtered out.
1111
12+
This Logger is thread-safe, with locks for both orchestration of message
13+
limits i.e. `maxlog`, and writes to the stream.
14+
1215
Message formatting can be controlled by setting keyword arguments:
1316
1417
* `meta_formatter` is a function which takes the log event metadata
@@ -24,6 +27,7 @@ Message formatting can be controlled by setting keyword arguments:
2427
"""
2528
struct ConsoleLogger <: AbstractLogger
2629
stream::IO
30+
lock::ReentrantLock # do not log within this lock
2731
min_level::LogLevel
2832
meta_formatter
2933
show_limited::Bool
@@ -33,19 +37,19 @@ end
3337
function ConsoleLogger(stream::IO, min_level=Info;
3438
meta_formatter=default_metafmt, show_limited=true,
3539
right_justify=0)
36-
ConsoleLogger(stream, min_level, meta_formatter,
40+
ConsoleLogger(stream, ReentrantLock(), min_level, meta_formatter,
3741
show_limited, right_justify, Dict{Any,Int}())
3842
end
3943
function ConsoleLogger(min_level=Info;
4044
meta_formatter=default_metafmt, show_limited=true,
4145
right_justify=0)
42-
ConsoleLogger(closed_stream, min_level, meta_formatter,
46+
ConsoleLogger(closed_stream, ReentrantLock(), min_level, meta_formatter,
4347
show_limited, right_justify, Dict{Any,Int}())
4448
end
4549

4650

4751
shouldlog(logger::ConsoleLogger, level, _module, group, id) =
48-
get(logger.message_limits, id, 1) > 0
52+
@lock logger.lock get(logger.message_limits, id, 1) > 0
4953

5054
min_enabled_level(logger::ConsoleLogger) = logger.min_level
5155

@@ -109,9 +113,11 @@ function handle_message(logger::ConsoleLogger, level::LogLevel, message, _module
109113
hasmaxlog = haskey(kwargs, :maxlog) ? 1 : 0
110114
maxlog = get(kwargs, :maxlog, nothing)
111115
if maxlog isa Core.BuiltinInts
112-
remaining = get!(logger.message_limits, id, Int(maxlog)::Int)
113-
logger.message_limits[id] = remaining - 1
114-
remaining > 0 || return
116+
@lock logger.lock begin
117+
remaining = get!(logger.message_limits, id, Int(maxlog)::Int)
118+
remaining == 0 && return
119+
logger.message_limits[id] = remaining - 1
120+
end
115121
end
116122

117123
# Generate a text representation of the message and all key value pairs,
@@ -175,6 +181,7 @@ function handle_message(logger::ConsoleLogger, level::LogLevel, message, _module
175181
println(iob)
176182
end
177183

178-
write(stream, take!(buf))
184+
b = take!(buf)
185+
@lock logger.lock write(stream, b)
179186
nothing
180187
end

stdlib/Logging/test/runtests.jl

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -292,4 +292,47 @@ end
292292
@test occursin("LogLevel(-500): a", String(take!(buf)))
293293
end
294294

295+
@testset "Logging when multithreaded" begin
296+
n = 10000
297+
cmd = `$(Base.julia_cmd()) -t4 --color=no $(joinpath(@__DIR__, "threads_exec.jl")) $n`
298+
fname = tempname()
299+
@testset "Thread safety" begin
300+
f = open(fname, "w")
301+
@test success(run(pipeline(cmd, stderr=f)))
302+
close(f)
303+
end
304+
305+
@testset "No tearing in log printing" begin
306+
# Check for print tearing by verifying that each log entry starts and ends correctly
307+
f = open(fname, "r")
308+
entry_start = r"┌ (Info|Warning|Error): iteration"
309+
entry_end = r""
310+
311+
open_entries = 0
312+
total_entries = 0
313+
for line in eachline(fname)
314+
starts = count(entry_start, line)
315+
starts > 1 && error("Interleaved logs: Multiple log entries started on one line")
316+
if starts == 1
317+
startswith(line, entry_start) || error("Interleaved logs: Log entry started in the middle of a line")
318+
open_entries += 1
319+
total_entries += 1
320+
end
321+
322+
ends = count(entry_end, line)
323+
starts == 1 && ends == 1 && error("Interleaved logs: Log entry started and and another ended on one line")
324+
ends > 1 && error("Interleaved logs: Multiple log entries ended on one line")
325+
if ends == 1
326+
startswith(line, entry_end) || error("Interleaved logs: Log entry ended in the middle of a line")
327+
open_entries -= 1
328+
end
329+
# Ensure no mismatched log entries
330+
open_entries >= 0 || error("Interleaved logs")
331+
end
332+
333+
@test open_entries == 0 # Ensure all entries closed properly
334+
@test total_entries == n * 3 # Ensure all logs were printed (3 because @debug is hidden)
335+
end
336+
end
337+
295338
end

stdlib/Logging/test/threads_exec.jl

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
using Logging
2+
3+
function test_threads_exec(n)
4+
Threads.@threads for i in 1:n
5+
@debug "iteration" maxlog=1 _id=Symbol("$(i)_debug") i Threads.threadid()
6+
@info "iteration" maxlog=1 _id=Symbol("$(i)_info") i Threads.threadid()
7+
@warn "iteration" maxlog=1 _id=Symbol("$(i)_warn") i Threads.threadid()
8+
@error "iteration" maxlog=1 _id=Symbol("$(i)_error") i Threads.threadid()
9+
end
10+
end
11+
12+
n = parse(Int, ARGS[1])
13+
test_threads_exec(n)

stdlib/Test/src/logging.jl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -107,8 +107,8 @@ function Logging.handle_message(logger::TestLogger, level, msg, _module,
107107
if maxlog isa Core.BuiltinInts
108108
@lock logger.lock begin
109109
remaining = get!(logger.message_limits, id, Int(maxlog)::Int)
110+
remaining == 0 && return
110111
logger.message_limits[id] = remaining - 1
111-
remaining > 0 || return
112112
end
113113
end
114114
end

0 commit comments

Comments
 (0)