Skip to content

Commit 7b393b4

Browse files
authored
Merge pull request #11 from clue-labs/consume-stdout
Only start consuming STDOUT data once connection is ready
2 parents c1732ba + 3d0cda6 commit 7b393b4

File tree

2 files changed

+114
-3
lines changed

2 files changed

+114
-3
lines changed

src/SshProcessConnector.php

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -69,18 +69,24 @@ public function connect($uri)
6969
$process = Io\processWithoutFds($command);
7070
$process->start($this->loop);
7171

72+
if ($this->debug) {
73+
echo 'Launched "' . $command . '" with PID ' . $process->getPid() . PHP_EOL; // @codeCoverageIgnore
74+
}
75+
7276
$deferred = new Deferred(function () use ($process, $uri) {
7377
$process->stdin->close();
7478
$process->terminate();
7579

7680
throw new \RuntimeException('Connection to ' . $uri . ' cancelled while waiting for SSH client');
7781
});
7882

79-
// process STDERR one line at a time
83+
// pause STDOUT and process STDERR one line at a time until connection is ready
84+
$process->stdout->pause();
8085
$last = null;
86+
$connected = false;
8187
$debug = $this->debug;
8288
$stderr = new LineSeparatedReader($process->stderr);
83-
$stderr->on('data', function ($line) use ($deferred, $process, $uri, &$last, $debug) {
89+
$stderr->on('data', function ($line) use ($deferred, $process, $uri, &$last, $debug, &$connected) {
8490
// remember last line for error output in case process exits
8591
$last = $line;
8692

@@ -109,11 +115,26 @@ public function connect($uri)
109115
return;
110116
}
111117

118+
// channel is ready, so resume STDOUT stream and resolve connection
119+
$process->stdout->resume();
112120
$connection = new CompositeConnection($process->stdout, $process->stdin);
113121
$deferred->resolve($connection);
122+
$connected = true;
114123
});
115124

116-
$process->on('exit', function ($code) use ($deferred, $uri, &$last) {
125+
// If STDERR closes before connection was established, explicitly close STDOUT stream.
126+
// The STDOUT stream starts in a paused state and as such will prevent the process exit
127+
// logic from triggering when it is not resumed.
128+
$stderr->on('close', function () use ($process, &$connected) {
129+
if (!$connected) {
130+
$process->stdout->close();
131+
}
132+
});
133+
134+
$process->on('exit', function ($code) use ($deferred, $uri, &$last, $debug) {
135+
if ($debug) {
136+
echo 'Process exit with code ' . $code . PHP_EOL; // @codeCoverageIgnore
137+
}
117138
$deferred->reject(new \RuntimeException(
118139
'Connection to ' . $uri . ' failed because SSH client died (' . $last . ')',
119140
$code
Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
<?php
2+
3+
use Clue\React\SshProxy\SshProcessConnector;
4+
use PHPUnit\Framework\TestCase;
5+
use React\EventLoop\Factory;
6+
use React\Socket\ConnectionInterface;
7+
8+
class IntegrationSshProcessConnectorTest extends TestCase
9+
{
10+
public function testConnectWillResolveWithConnectionInterfaceWhenProcessOutputsChannelOpenConfirmMessage()
11+
{
12+
$loop = Factory::create();
13+
$connector = new SshProcessConnector('host', $loop);
14+
15+
$ref = new ReflectionProperty($connector, 'cmd');
16+
$ref->setAccessible(true);
17+
$ref->setValue($connector, 'echo "debug2: channel 0: open confirm rwindow 2097152 rmax 32768" >&2; #');
18+
19+
$promise = $connector->connect('example.com:80');
20+
$promise->then($this->expectCallableOnceWith($this->isInstanceOf('React\Socket\ConnectionInterface')));
21+
22+
$loop->run();
23+
}
24+
25+
public function testConnectWillRejectWithExceptionWhenProcessOutputsChannelOpenFailedMessage()
26+
{
27+
$loop = Factory::create();
28+
$connector = new SshProcessConnector('host', $loop);
29+
30+
$ref = new ReflectionProperty($connector, 'cmd');
31+
$ref->setAccessible(true);
32+
$ref->setValue($connector, 'echo "channel 0: open failed: administratively prohibited: open failed" >&2; #');
33+
34+
$promise = $connector->connect('example.com:80');
35+
$promise->then(null, $this->expectCallableOnceWith($this->isInstanceOf('RuntimeException')));
36+
37+
$loop->run();
38+
}
39+
40+
public function testConnectWillRejectWithExceptionWhenProcessOutputsEndsWithoutChannelMessage()
41+
{
42+
$loop = Factory::create();
43+
$connector = new SshProcessConnector('host', $loop);
44+
45+
$ref = new ReflectionProperty($connector, 'cmd');
46+
$ref->setAccessible(true);
47+
$ref->setValue($connector, 'echo foo >&2; #');
48+
49+
$promise = $connector->connect('example.com:80');
50+
$promise->then(null, $this->expectCallableOnceWith($this->isInstanceOf('RuntimeException')));
51+
52+
$loop->run();
53+
}
54+
55+
public function testConnectWillResolveWithConnectionThatWillEmitImmediateDataFromProcessStdoutAfterChannelOpenConfirmMessage()
56+
{
57+
$loop = Factory::create();
58+
$connector = new SshProcessConnector('host', $loop);
59+
60+
$ref = new ReflectionProperty($connector, 'cmd');
61+
$ref->setAccessible(true);
62+
$ref->setValue($connector, 'echo "debug2: channel 0: open confirm rwindow 2097152 rmax 32768" >&2; echo foo #');
63+
64+
$promise = $connector->connect('example.com:80');
65+
66+
$data = $this->expectCallableOnceWith("foo\n");
67+
$promise->then(function (ConnectionInterface $connection) use ($data) {
68+
$connection->on('data', $data);
69+
});
70+
71+
$loop->run();
72+
}
73+
74+
protected function expectCallableOnceWith($value)
75+
{
76+
$mock = $this->createCallableMock();
77+
78+
$mock
79+
->expects($this->once())
80+
->method('__invoke')
81+
->with($value);
82+
83+
return $mock;
84+
}
85+
86+
protected function createCallableMock()
87+
{
88+
return $this->getMockBuilder('stdClass')->setMethods(array('__invoke'))->getMock();
89+
}
90+
}

0 commit comments

Comments
 (0)