Skip to content

Commit c516b7d

Browse files
committed
spawn: permit using IOBuffer at stdout
People expect to use this (the docs even almost even suggested it at some point), so it is better to make it work as expected (and better than they can emulate) than to criticize their choices. Also fix a few regressions and handling mistakes in setup_stdios: - #44500 tried to store a Redirectable into a SpawnIO, dropping FileRedirect - CmdRedirect did not allocate a ProcessChain, so it wouldd call setup_stdio then call setup_stdios on the result of that, which is strongly discouraged as setup_stdio(s) should only be called once - BufferStream was missing `check_open` calls before writing, and ignored `Base.reseteof` as a possible means of resuming writing after `closewrite` sends a shutdown message. Currently this fix is disabled because Pkg seems like a bit of a disaster with IO mismanagement. - Add `closewrite` to more methods, and document it. Fixes #39311 Fixes #49234 Closes #49233 Closes #46768
1 parent 649982a commit c516b7d

File tree

12 files changed

+146
-54
lines changed

12 files changed

+146
-54
lines changed

NEWS.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,14 @@ New library features
7272
write the output to a stream rather than returning a string ([#48625]).
7373
* `sizehint!(s, n)` now supports an optional `shrink` argument to disable shrinking ([#51929]).
7474
* New function `Docs.hasdoc(module, symbol)` tells whether a name has a docstring ([#52139]).
75+
* Passing an IOBuffer as a stdout argument for Process spawn now works as
76+
expected, synchronized with `wait` or `success`, so a `Base.BufferStream` is
77+
no longer required there for correctness to avoid data-races ([#TBD]).
78+
* After a process exits, `closewrite` will no longer be automatically called on
79+
the stream passed to it. Call `wait` on the process instead to ensure the
80+
content is fully written, then call `closewrite` manually to avoid
81+
data-races. Or use the callback form of `open` to have all that handled
82+
automatically.
7583

7684
Standard library changes
7785
------------------------

base/coreio.jl

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ struct DevNull <: IO end
1111
const devnull = DevNull()
1212
write(::DevNull, ::UInt8) = 1
1313
unsafe_write(::DevNull, ::Ptr{UInt8}, n::UInt)::Int = n
14+
closewrite(::DevNull) = nothing
1415
close(::DevNull) = nothing
1516
wait_close(::DevNull) = wait()
1617
bytesavailable(io::DevNull) = 0

base/exports.jl

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1155,6 +1155,16 @@ public
11551155
@locals,
11561156
@propagate_inbounds,
11571157

1158+
# IO
1159+
# types
1160+
BufferStream,
1161+
IOServer,
1162+
OS_HANDLE,
1163+
PipeEndpoint,
1164+
TTY,
1165+
# functions
1166+
reseteof,
1167+
11581168
# misc
11591169
notnothing,
11601170
runtests,

base/filesystem.jl

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,8 @@ function close(f::File)
143143
nothing
144144
end
145145

146+
closewrite(f::File) = nothing
147+
146148
# sendfile is the most efficient way to copy from a file descriptor
147149
function sendfile(dst::File, src::File, src_offset::Int64, bytes::Int)
148150
check_open(dst)

base/io.jl

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,14 @@ end
2525

2626
lock(::IO) = nothing
2727
unlock(::IO) = nothing
28+
29+
"""
30+
reseteof(io)
31+
32+
Clear the EOF flag from IO so that further reads (and possibly writes) are
33+
again allowed. Note that it may immediately get re-set, if the underlying
34+
stream object is at EOF and cannot be resumed.
35+
"""
2836
reseteof(x::IO) = nothing
2937

3038
const SZ_UNBUFFERED_IO = 65536
@@ -68,6 +76,10 @@ Shutdown the write half of a full-duplex I/O stream. Performs a [`flush`](@ref)
6876
first. Notify the other end that no more data will be written to the underlying
6977
file. This is not supported by all IO types.
7078
79+
If implemented, `closewrite` causes subsequent `read` or `eof` calls that would
80+
block to instead throw EOF or return true, respectively. If the stream is
81+
already closed, this is idempotent.
82+
7183
# Examples
7284
```jldoctest
7385
julia> io = Base.BufferStream(); # this never blocks, so we can read and write on the same Task

base/iobuffer.jl

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -349,7 +349,6 @@ eof(io::GenericIOBuffer) = (io.ptr-1 == io.size)
349349

350350
function closewrite(io::GenericIOBuffer)
351351
io.writable = false
352-
# OR throw(_UVError("closewrite", UV_ENOTSOCK))
353352
nothing
354353
end
355354

base/iostream.jl

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,8 @@ function close(s::IOStream)
6363
systemerror("close", bad)
6464
end
6565

66+
closewrite(s::IOStream) = nothing
67+
6668
function flush(s::IOStream)
6769
sigatomic_begin()
6870
bad = @_lock_ios s ccall(:ios_flush, Cint, (Ptr{Cvoid},), s.ios) != 0

base/process.jl

Lines changed: 84 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,12 @@ mutable struct Process <: AbstractPipe
66
in::IO
77
out::IO
88
err::IO
9+
syncd::Vector{Task}
910
exitcode::Int64
1011
termsignal::Int32
1112
exitnotify::ThreadSynchronizer
12-
function Process(cmd::Cmd, handle::Ptr{Cvoid})
13-
this = new(cmd, handle, devnull, devnull, devnull,
13+
function Process(cmd::Cmd, handle::Ptr{Cvoid}, syncd::Vector{Task})
14+
this = new(cmd, handle, devnull, devnull, devnull, syncd,
1415
typemin(fieldtype(Process, :exitcode)),
1516
typemin(fieldtype(Process, :termsignal)),
1617
ThreadSynchronizer())
@@ -35,6 +36,15 @@ end
3536
pipe_reader(p::ProcessChain) = p.out
3637
pipe_writer(p::ProcessChain) = p.in
3738

39+
# a lightweight pair of a child OS_HANDLE and associated Task that will
40+
# complete only after all content has been read from it for synchronizing
41+
# state without the kernel to aide
42+
struct SyncCloseFD
43+
fd
44+
t::Task
45+
end
46+
rawhandle(io::SyncCloseFD) = rawhandle(io.fd)
47+
3848
# release ownership of the libuv handle
3949
function uvfinalize(proc::Process)
4050
if proc.handle != C_NULL
@@ -74,8 +84,8 @@ function _uv_hook_close(proc::Process)
7484
nothing
7585
end
7686

77-
const SpawnIO = Union{IO, RawFD, OS_HANDLE}
78-
const SpawnIOs = Vector{SpawnIO} # convenience name for readability
87+
const SpawnIO = Union{IO, RawFD, OS_HANDLE, SyncCloseFD} # internal copy of Redirectable, removing FileRedirect and adding SyncCloseFD
88+
const SpawnIOs = Memory{SpawnIO} # convenience name for readability (used for dispatch also to clearly distinguish from Vector{Redirectable})
7989

8090
function as_cpumask(cpus::Vector{UInt16})
8191
n = max(Int(maximum(cpus)), Int(ccall(:uv_cpumask_size, Cint, ())))
@@ -100,6 +110,7 @@ end
100110
error("invalid spawn handle $h from $io")
101111
end
102112
for io in stdio]
113+
syncd = Task[io.t for io in stdio if io isa SyncCloseFD]
103114
handle = Libc.malloc(_sizeof_uv_process)
104115
disassociate_julia_struct(handle)
105116
(; exec, flags, env, dir) = cmd
@@ -117,7 +128,7 @@ end
117128
cpumask === nothing ? 0 : length(cpumask),
118129
@cfunction(uv_return_spawn, Cvoid, (Ptr{Cvoid}, Int64, Int32)))
119130
if err == 0
120-
pp = Process(cmd, handle)
131+
pp = Process(cmd, handle, syncd)
121132
associate_julia_struct(handle, pp)
122133
else
123134
ccall(:jl_forceclose_uv, Cvoid, (Ptr{Cvoid},), handle) # will call free on handle eventually
@@ -130,23 +141,24 @@ end
130141
return pp
131142
end
132143

133-
_spawn(cmds::AbstractCmd) = _spawn(cmds, SpawnIO[])
144+
_spawn(cmds::AbstractCmd) = _spawn(cmds, SpawnIOs())
134145

135-
# optimization: we can spawn `Cmd` directly without allocating the ProcessChain
136-
function _spawn(cmd::Cmd, stdios::SpawnIOs)
137-
isempty(cmd.exec) && throw(ArgumentError("cannot spawn empty command"))
146+
function _spawn(cmd::AbstractCmd, stdios::Vector{Redirectable})
138147
pp = setup_stdios(stdios) do stdios
139-
return _spawn_primitive(cmd.exec[1], cmd, stdios)
148+
return _spawn(cmd, stdios)
140149
end
141150
return pp
142151
end
143152

153+
# optimization: we can spawn `Cmd` directly without allocating the ProcessChain
154+
function _spawn(cmd::Cmd, stdios::SpawnIOs)
155+
isempty(cmd.exec) && throw(ArgumentError("cannot spawn empty command"))
156+
return _spawn_primitive(cmd.exec[1], cmd, stdios)
157+
end
158+
144159
# assume that having a ProcessChain means that the stdio are setup
145160
function _spawn(cmds::AbstractCmd, stdios::SpawnIOs)
146-
pp = setup_stdios(stdios) do stdios
147-
return _spawn(cmds, stdios, ProcessChain())
148-
end
149-
return pp
161+
return _spawn(cmds, stdios, ProcessChain())
150162
end
151163

152164
# helper function for making a copy of a SpawnIOs, with replacement
@@ -212,7 +224,7 @@ end
212224

213225

214226
# open the child end of each element of `stdios`, and initialize the parent end
215-
function setup_stdios(f, stdios::SpawnIOs)
227+
function setup_stdios(f, stdios::Vector{Redirectable})
216228
nstdio = length(stdios)
217229
open_io = SpawnIOs(undef, nstdio)
218230
close_io = falses(nstdio)
@@ -295,25 +307,26 @@ function setup_stdio(stdio::IO, child_readable::Bool)
295307
child = child_readable ? rd : wr
296308
try
297309
let in = (child_readable ? parent : stdio),
298-
out = (child_readable ? stdio : parent)
299-
@async try
310+
out = (child_readable ? stdio : parent),
311+
t = @async try
300312
write(in, out)
301313
catch ex
302314
@warn "Process I/O error" exception=(ex, catch_backtrace())
315+
rethrow()
303316
finally
304317
close(parent)
305-
child_readable || closewrite(stdio)
306318
end
319+
return (SyncCloseFD(child, t), true)
307320
end
308321
catch
309322
close_pipe_sync(child)
310323
rethrow()
311324
end
312-
return (child, true)
313325
end
314326

315-
close_stdio(stdio::OS_HANDLE) = close_pipe_sync(stdio)
316327
close_stdio(stdio) = close(stdio)
328+
close_stdio(stdio::OS_HANDLE) = close_pipe_sync(stdio)
329+
close_stdio(stdio::SyncCloseFD) = close_stdio(stdio.fd)
317330

318331
# INTERNAL
319332
# pad out stdio to have at least three elements,
@@ -325,19 +338,19 @@ close_stdio(stdio) = close(stdio)
325338
# - An Filesystem.File or IOStream object to redirect the output to
326339
# - A FileRedirect, containing a string specifying a filename to be opened for the child
327340

328-
spawn_opts_swallow(stdios::StdIOSet) = SpawnIO[stdios...]
329-
spawn_opts_inherit(stdios::StdIOSet) = SpawnIO[stdios...]
341+
spawn_opts_swallow(stdios::StdIOSet) = Redirectable[stdios...]
342+
spawn_opts_inherit(stdios::StdIOSet) = Redirectable[stdios...]
330343
spawn_opts_swallow(in::Redirectable=devnull, out::Redirectable=devnull, err::Redirectable=devnull) =
331-
SpawnIO[in, out, err]
344+
Redirectable[in, out, err]
332345
# pass original descriptors to child processes by default, because we might
333346
# have already exhausted and closed the libuv object for our standard streams.
334347
# ref issue #8529
335348
spawn_opts_inherit(in::Redirectable=RawFD(0), out::Redirectable=RawFD(1), err::Redirectable=RawFD(2)) =
336-
SpawnIO[in, out, err]
349+
Redirectable[in, out, err]
337350

338351
function eachline(cmd::AbstractCmd; keep::Bool=false)
339352
out = PipeEndpoint()
340-
processes = _spawn(cmd, SpawnIO[devnull, out, stderr])
353+
processes = _spawn(cmd, Redirectable[devnull, out, stderr])
341354
# if the user consumes all the data, also check process exit status for success
342355
ondone = () -> (success(processes) || pipeline_error(processes); nothing)
343356
return EachLine(out, keep=keep, ondone=ondone)::EachLine
@@ -385,20 +398,20 @@ function open(cmds::AbstractCmd, stdio::Redirectable=devnull; write::Bool=false,
385398
stdio === devnull || throw(ArgumentError("no stream can be specified for `stdio` in read-write mode"))
386399
in = PipeEndpoint()
387400
out = PipeEndpoint()
388-
processes = _spawn(cmds, SpawnIO[in, out, stderr])
401+
processes = _spawn(cmds, Redirectable[in, out, stderr])
389402
processes.in = in
390403
processes.out = out
391404
elseif read
392405
out = PipeEndpoint()
393-
processes = _spawn(cmds, SpawnIO[stdio, out, stderr])
406+
processes = _spawn(cmds, Redirectable[stdio, out, stderr])
394407
processes.out = out
395408
elseif write
396409
in = PipeEndpoint()
397-
processes = _spawn(cmds, SpawnIO[in, stdio, stderr])
410+
processes = _spawn(cmds, Redirectable[in, stdio, stderr])
398411
processes.in = in
399412
else
400413
stdio === devnull || throw(ArgumentError("no stream can be specified for `stdio` in no-access mode"))
401-
processes = _spawn(cmds, SpawnIO[devnull, devnull, stderr])
414+
processes = _spawn(cmds, Redirectable[devnull, devnull, stderr])
402415
end
403416
return processes
404417
end
@@ -415,12 +428,18 @@ function open(f::Function, cmds::AbstractCmd, args...; kwargs...)
415428
P = open(cmds, args...; kwargs...)
416429
function waitkill(P::Union{Process,ProcessChain})
417430
close(P)
418-
# 0.1 seconds after we hope it dies (from closing stdio),
419-
# we kill the process with SIGTERM (15)
420-
local t = Timer(0.1) do t
431+
# shortly after we hope it starts cleanup and dies (from closing
432+
# stdio), we kill the process with SIGTERM (15) so that we can proceed
433+
# with throwing the error and hope it will exit soon from that
434+
local t = Timer(2) do t
421435
process_running(P) && kill(P)
422436
end
423-
wait(P)
437+
# pass false to indicate that we do not care about data-races on the
438+
# Julia stdio objects after this point, since we already know this is
439+
# an error path and the state of them is fairly unpredictable anyways
440+
# in that case. Since we closed P some of those should come crumbling
441+
# down already, and we don't want to throw that error here either.
442+
wait(P, false)
424443
close(t)
425444
end
426445
ret = try
@@ -430,10 +449,23 @@ function open(f::Function, cmds::AbstractCmd, args...; kwargs...)
430449
rethrow()
431450
end
432451
close(P.in)
452+
closestdio = @async begin
453+
# wait for P to complete (including sync'd), then mark the output streams for EOF (if applicable to that stream type)
454+
wait(P)
455+
err = P.err
456+
applicable(closewrite, err) && closewrite(err)
457+
out = P.out
458+
applicable(closewrite, out) && closewrite(out)
459+
nothing
460+
end
461+
# now verify that the output stream is at EOF, and the user didn't fail to consume it successfully
462+
# (we do not currently verify the user dealt with the stderr stream)
433463
if !(eof(P.out)::Bool)
434464
waitkill(P)
435465
throw(_UVError("open(do)", UV_EPIPE))
436466
end
467+
# make sure to closestdio is completely done to avoid data-races later
468+
wait(closestdio)
437469
success(P) || pipeline_error(P)
438470
return ret
439471
end
@@ -650,26 +682,31 @@ function process_status(s::Process)
650682
error("process status error")
651683
end
652684

653-
function wait(x::Process)
654-
process_exited(x) && return
655-
iolock_begin()
685+
function wait(x::Process, syncd::Bool=true)
656686
if !process_exited(x)
657-
preserve_handle(x)
658-
lock(x.exitnotify)
659-
iolock_end()
660-
try
661-
wait(x.exitnotify)
662-
finally
663-
unlock(x.exitnotify)
664-
unpreserve_handle(x)
687+
iolock_begin()
688+
if !process_exited(x)
689+
preserve_handle(x)
690+
lock(x.exitnotify)
691+
iolock_end()
692+
try
693+
wait(x.exitnotify)
694+
finally
695+
unlock(x.exitnotify)
696+
unpreserve_handle(x)
697+
end
698+
else
699+
iolock_end()
665700
end
666-
else
667-
iolock_end()
701+
end
702+
# and make sure all sync'd Tasks are complete too
703+
syncd && for t in x.syncd
704+
wait(t)
668705
end
669706
nothing
670707
end
671708

672-
wait(x::ProcessChain) = foreach(wait, x.processes)
709+
wait(x::ProcessChain, syncd::Bool=true) = foreach(p -> wait(p, syncd), x.processes)
673710

674711
show(io::IO, p::Process) = print(io, "Process(", p.cmd, ", ", process_status(p), ")")
675712

0 commit comments

Comments
 (0)