Skip to content

Support passing custom pipes and file descriptors to child process #65

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Dec 3, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 63 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ as [Streams](https://github.com/reactphp/stream).
* [Stream Properties](#stream-properties)
* [Command](#command)
* [Termination](#termination)
* [Custom pipes](#custom-pipes)
* [Sigchild Compatibility](#sigchild-compatibility)
* [Windows Compatibility](#windows-compatibility)
* [Install](#install)
Expand Down Expand Up @@ -55,21 +56,31 @@ Once a process is started, its I/O streams will be constructed as instances of
Before `start()` is called, these properties are not set. Once a process terminates,
the streams will become closed but not unset.

Following common Unix conventions, this library will start each child process
with the three pipes matching the standard I/O streams as given below by default.
You can use the named references for common use cases or access these as an
array with all three pipes.

* `$stdin` or `$pipes[0]` is a `WritableStreamInterface`
* `$stdout` or `$pipes[1]` is a `ReadableStreamInterface`
* `$stderr` or `$pipes[2]` is a `ReadableStreamInterface`

Following common Unix conventions, this library will always start each child
process with the three pipes matching the standard I/O streams as given above.
You can use the named references for common use cases or access these as an
array with all three pipes.
Note that this default configuration may be overridden by explicitly passing
[custom pipes](#custom-pipes), in which case they may not be set or be assigned
different values. The `$pipes` array will always contain references to all pipes
as configured and the standard I/O references will always be set to reference
the pipes matching the above conventions. See [custom pipes](#custom-pipes) for
more details.

Because each of these implement the underlying
[`ReadableStreamInterface`](https://github.com/reactphp/stream#readablestreaminterface) or
[`WritableStreamInterface`](https://github.com/reactphp/stream#writablestreaminterface),
you can use any of their events and methods as usual:

```php
$process = new Process($command);
$process->start($loop);

$process->stdout->on('data', function ($chunk) {
echo $chunk;
});
Expand Down Expand Up @@ -298,6 +309,54 @@ properties actually allow some fine grained control over process termination,
such as first trying a soft-close and then applying a force-close after a
timeout.

### Custom pipes

Following common Unix conventions, this library will start each child process
with the three pipes matching the standard I/O streams by default. For more
advanced use cases it may be useful to pass in custom pipes, such as explicitly
passing additional file descriptors (FDs) or overriding default process pipes.

Note that passing custom pipes is considered advanced usage and requires a
more in-depth understanding of Unix file descriptors and how they are inherited
to child processes and shared in multi-processing applications.

If you do not want to use the default standard I/O pipes, you can explicitly
pass an array containing the file descriptor specification to the constructor
like this:

```php
$fds = array(
// standard I/O pipes for stdin/stdout/stderr
0 => array('pipe', 'r'),
1 => array('pipe', 'w'),
2 => array('pipe', 'w'),

// example FDs for files or open resources
4 => array('file', '/dev/null', 'r'),
6 => fopen('log.txt','a'),
8 => STDERR,

// example FDs for sockets
10 => fsockopen('localhost', 8080),
12 => stream_socket_server('tcp://0.0.0.0:4711')
);

$process = new Process($cmd, null, null, $fds);
$process->start($loop);
```

Unless your use case has special requirements that demand otherwise, you're
highly recommended to (at least) pass in the standard I/O pipes as given above.
The file descriptor specification accepts arguments in the exact same format
as the underlying [`proc_open()`](http://php.net/proc_open) function.

Once the process is started, the `$pipes` array will always contain references to
all pipes as configured and the standard I/O references will always be set to
reference the pipes matching common Unix conventions. This library supports any
number of pipes and additional file descriptors, but many common applications
being run as a child process will expect that the parent process properly
assigns these file descriptors.

### Sigchild Compatibility

Internally, this project uses a work-around to improve compatibility when PHP
Expand Down
23 changes: 23 additions & 0 deletions examples/21-fds.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
<?php

use React\EventLoop\Factory;
use React\ChildProcess\Process;

require __DIR__ . '/../vendor/autoload.php';

$loop = Factory::create();

$process = new Process('exec 0>&- 2>&-;exec ls -la /proc/self/fd', null, null, array(
1 => array('pipe', 'w')
));
$process->start($loop);

$process->stdout->on('data', function ($chunk) {
echo $chunk;
});

$process->on('exit', function ($code) {
echo 'EXIT with code ' . $code . PHP_EOL;
});

$loop->run();
77 changes: 52 additions & 25 deletions src/Process.php
Original file line number Diff line number Diff line change
Expand Up @@ -20,22 +20,25 @@
class Process extends EventEmitter
{
/**
* @var ?WritableStreamInterface
* @var WritableStreamInterface|null|ReadableStreamInterface
*/
public $stdin;

/**
* @var ?ReadableStreamInterface
* @var ReadableStreamInterface|null|WritableStreamInterface
*/
public $stdout;

/**
* @var ?ReadableStreamInterface
* @var ReadableStreamInterface|null|WritableStreamInterface
*/
public $stderr;

/**
* Array with all process pipes (once started)
*
* Unless explicitly configured otherwise during construction, the following
* standard I/O pipes will be assigned by default:
* - 0: STDIN (`WritableStreamInterface`)
* - 1: STDOUT (`ReadableStreamInterface`)
* - 2: STDERR (`ReadableStreamInterface`)
Expand All @@ -47,6 +50,8 @@ class Process extends EventEmitter
private $cmd;
private $cwd;
private $env;
private $fds;

private $enhanceSigchildCompatibility;
private $sigchildPipe;

Expand All @@ -65,9 +70,10 @@ class Process extends EventEmitter
* @param string $cmd Command line to run
* @param null|string $cwd Current working directory or null to inherit
* @param null|array $env Environment variables or null to inherit
* @param null|array $fds File descriptors to allocate for this process (or null = default STDIO streams)
* @throws \LogicException On windows or when proc_open() is not installed
*/
public function __construct($cmd, $cwd = null, array $env = null)
public function __construct($cmd, $cwd = null, array $env = null, array $fds = null)
{
if (substr(strtolower(PHP_OS), 0, 3) === 'win') {
throw new \LogicException('Windows isn\'t supported due to the blocking nature of STDIN/STDOUT/STDERR pipes.');
Expand All @@ -87,18 +93,27 @@ public function __construct($cmd, $cwd = null, array $env = null)
}
}

if ($fds === null) {
$fds = array(
array('pipe', 'r'), // stdin
array('pipe', 'w'), // stdout
array('pipe', 'w'), // stderr
);
}

$this->fds = $fds;
$this->enhanceSigchildCompatibility = self::isSigchildEnabled();
}

/**
* Start the process.
*
* After the process is started, the standard IO streams will be constructed
* and available via public properties. STDIN will be paused upon creation.
* After the process is started, the standard I/O streams will be constructed
* and available via public properties.
*
* @param LoopInterface $loop Loop interface for stream construction
* @param float $interval Interval to periodically monitor process state (seconds)
* @throws RuntimeException If the process is already running or fails to start
* @throws \RuntimeException If the process is already running or fails to start
*/
public function start(LoopInterface $loop, $interval = 0.1)
{
Expand All @@ -107,17 +122,22 @@ public function start(LoopInterface $loop, $interval = 0.1)
}

$cmd = $this->cmd;
$fdSpec = array(
array('pipe', 'r'), // stdin
array('pipe', 'w'), // stdout
array('pipe', 'w'), // stderr
);

$fdSpec = $this->fds;
$sigchild = null;

// Read exit code through fourth pipe to work around --enable-sigchild
if ($this->enhanceSigchildCompatibility) {
$fdSpec[] = array('pipe', 'w');
$sigchild = 3;
\end($fdSpec);
$sigchild = \key($fdSpec);

// make sure this is fourth or higher (do not mess with STDIO)
if ($sigchild < 3) {
$fdSpec[3] = $fdSpec[$sigchild];
unset($fdSpec[$sigchild]);
$sigchild = 3;
}

$cmd = sprintf('(%s) ' . $sigchild . '>/dev/null; code=$?; echo $code >&' . $sigchild . '; exit $code', $cmd);
}

Expand All @@ -127,13 +147,13 @@ public function start(LoopInterface $loop, $interval = 0.1)
throw new \RuntimeException('Unable to launch a new process.');
}

$closeCount = 0;

// count open process pipes and await close event for each to drain buffers before detecting exit
$that = $this;
$closeCount = 0;
$streamCloseHandler = function () use (&$closeCount, $loop, $interval, $that) {
$closeCount++;
$closeCount--;

if ($closeCount < 2) {
if ($closeCount > 0) {
return;
}

Expand All @@ -160,18 +180,25 @@ public function start(LoopInterface $loop, $interval = 0.1)
}

foreach ($pipes as $n => $fd) {
if ($n === 0) {
$meta = \stream_get_meta_data($fd);
if (\strpos($meta['mode'], 'w') !== false) {
$stream = new WritableResourceStream($fd, $loop);
} else {
$stream = new ReadableResourceStream($fd, $loop);
$stream->on('close', $streamCloseHandler);
$closeCount++;
}
$this->pipes[$n] = $stream;
}

$this->stdin = $this->pipes[0];
$this->stdout = $this->pipes[1];
$this->stderr = $this->pipes[2];
$this->stdin = isset($this->pipes[0]) ? $this->pipes[0] : null;
$this->stdout = isset($this->pipes[1]) ? $this->pipes[1] : null;
$this->stderr = isset($this->pipes[2]) ? $this->pipes[2] : null;

// immediately start checking for process exit when started without any I/O pipes
if (!$closeCount) {
$streamCloseHandler();
}
}

/**
Expand All @@ -186,9 +213,9 @@ public function close()
return;
}

$this->stdin->close();
$this->stdout->close();
$this->stderr->close();
foreach ($this->pipes as $pipe) {
$pipe->close();
}

if ($this->enhanceSigchildCompatibility) {
$this->pollExitCodePipe();
Expand Down
42 changes: 42 additions & 0 deletions tests/AbstractProcessTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,33 @@ public function testStartWillAssignPipes()
$this->assertSame($process->stderr, $process->pipes[2]);
}

public function testStartWithoutAnyPipesWillNotAssignPipes()
{
$process = new Process('exit 0', null, null, array());
$process->start($this->createLoop());

$this->assertNull($process->stdin);
$this->assertNull($process->stdout);
$this->assertNull($process->stderr);
$this->assertEquals(array(), $process->pipes);
}

public function testStartWithCustomPipesWillAssignPipes()
{
$process = new Process('exit 0', null, null, array(
0 => array('pipe', 'w'),
3 => array('pipe', 'r')
));
$process->start($this->createLoop());

$this->assertInstanceOf('React\Stream\ReadableStreamInterface', $process->stdin);
$this->assertNull($process->stdout);
$this->assertNull($process->stderr);
$this->assertCount(2, $process->pipes);
$this->assertSame($process->stdin, $process->pipes[0]);
$this->assertInstanceOf('React\Stream\WritableStreamInterface', $process->pipes[3]);
}

public function testIsRunning()
{
$process = new Process('sleep 1');
Expand Down Expand Up @@ -333,6 +360,21 @@ public function testDetectsClosingProcessEvenWhenAllStdioPipesHaveBeenClosed()
$time = microtime(true) - $time;

$this->assertLessThan(0.1, $time);
$this->assertSame(0, $process->getExitCode());
}

public function testDetectsClosingProcessEvenWhenStartedWithoutPipes()
{
$loop = $this->createLoop();
$process = new Process('exit 0', null, null, array());
$process->start($loop, 0.001);

$time = microtime(true);
$loop->run();
$time = microtime(true) - $time;

$this->assertLessThan(0.1, $time);
$this->assertSame(0, $process->getExitCode());
}

public function testStartInvalidProcess()
Expand Down