Skip to content

Commit 8095cd1

Browse files
authored
Merge pull request #482 from clue-labs/include-buffer
Include buffer logic to avoid dependency on reactphp/promise-stream
2 parents aa75bcd + bafa2af commit 8095cd1

File tree

5 files changed

+346
-57
lines changed

5 files changed

+346
-57
lines changed

composer.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@
3232
"psr/http-message": "^1.0",
3333
"react/event-loop": "^1.2",
3434
"react/promise": "^3 || ^2.3 || ^1.2.1",
35-
"react/promise-stream": "^1.4",
3635
"react/socket": "^1.12",
3736
"react/stream": "^1.2",
3837
"ringcentral/psr7": "^1.2"
@@ -43,6 +42,7 @@
4342
"clue/socks-react": "^1.4",
4443
"phpunit/phpunit": "^9.5 || ^5.7 || ^4.8.35",
4544
"react/async": "^4 || ^3 || ^2",
45+
"react/promise-stream": "^1.4",
4646
"react/promise-timer": "^1.9"
4747
},
4848
"autoload": {

src/Io/Transaction.php

Lines changed: 45 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
use React\Http\Message\Response;
1010
use React\Http\Message\ResponseException;
1111
use React\Promise\Deferred;
12+
use React\Promise\Promise;
1213
use React\Promise\PromiseInterface;
1314
use React\Stream\ReadableStreamInterface;
1415
use RingCentral\Psr7\Uri;
@@ -165,46 +166,67 @@ function (ResponseInterface $response) use ($request, $that, $deferred, $state)
165166
*/
166167
public function bufferResponse(ResponseInterface $response, Deferred $deferred, ClientRequestState $state)
167168
{
168-
$stream = $response->getBody();
169+
$body = $response->getBody();
170+
$size = $body->getSize();
169171

170-
$size = $stream->getSize();
171172
if ($size !== null && $size > $this->maximumSize) {
172-
$stream->close();
173+
$body->close();
173174
return \React\Promise\reject(new \OverflowException(
174175
'Response body size of ' . $size . ' bytes exceeds maximum of ' . $this->maximumSize . ' bytes',
175-
\defined('SOCKET_EMSGSIZE') ? \SOCKET_EMSGSIZE : 0
176+
\defined('SOCKET_EMSGSIZE') ? \SOCKET_EMSGSIZE : 90
176177
));
177178
}
178179

179180
// body is not streaming => already buffered
180-
if (!$stream instanceof ReadableStreamInterface) {
181+
if (!$body instanceof ReadableStreamInterface) {
181182
return \React\Promise\resolve($response);
182183
}
183184

184-
// buffer stream and resolve with buffered body
185+
/** @var ?\Closure $closer */
186+
$closer = null;
185187
$maximumSize = $this->maximumSize;
186-
$promise = \React\Promise\Stream\buffer($stream, $maximumSize)->then(
187-
function ($body) use ($response) {
188-
return $response->withBody(new BufferedBody($body));
189-
},
190-
function ($e) use ($stream, $maximumSize) {
191-
// try to close stream if buffering fails (or is cancelled)
192-
$stream->close();
193188

194-
if ($e instanceof \OverflowException) {
195-
$e = new \OverflowException(
189+
return $state->pending = new Promise(function ($resolve, $reject) use ($body, $maximumSize, $response, &$closer) {
190+
// resolve with current buffer when stream closes successfully
191+
$buffer = '';
192+
$body->on('close', $closer = function () use (&$buffer, $response, $maximumSize, $resolve, $reject) {
193+
$resolve($response->withBody(new BufferedBody($buffer)));
194+
});
195+
196+
// buffer response body data in memory
197+
$body->on('data', function ($data) use (&$buffer, $maximumSize, $body, $closer, $reject) {
198+
$buffer .= $data;
199+
200+
// close stream and reject promise if limit is exceeded
201+
if (isset($buffer[$maximumSize])) {
202+
$buffer = '';
203+
assert($closer instanceof \Closure);
204+
$body->removeListener('close', $closer);
205+
$body->close();
206+
207+
$reject(new \OverflowException(
196208
'Response body size exceeds maximum of ' . $maximumSize . ' bytes',
197-
\defined('SOCKET_EMSGSIZE') ? \SOCKET_EMSGSIZE : 0
198-
);
209+
\defined('SOCKET_EMSGSIZE') ? \SOCKET_EMSGSIZE : 90
210+
));
199211
}
212+
});
200213

201-
throw $e;
202-
}
203-
);
204-
205-
$state->pending = $promise;
214+
// reject buffering if body emits error
215+
$body->on('error', function (\Exception $e) use ($reject) {
216+
$reject(new \RuntimeException(
217+
'Error while buffering response body: ' . $e->getMessage(),
218+
$e->getCode(),
219+
$e
220+
));
221+
});
222+
}, function () use ($body, &$closer) {
223+
// cancelled buffering: remove close handler to avoid resolving, then close and reject
224+
assert($closer instanceof \Closure);
225+
$body->removeListener('close', $closer);
226+
$body->close();
206227

207-
return $promise;
228+
throw new \RuntimeException('Cancelled buffering response body');
229+
});
208230
}
209231

210232
/**

src/Middleware/RequestBodyBufferMiddleware.php

Lines changed: 57 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
use Psr\Http\Message\ServerRequestInterface;
77
use React\Http\Io\BufferedBody;
88
use React\Http\Io\IniUtil;
9-
use React\Promise\Stream;
9+
use React\Promise\Promise;
1010
use React\Stream\ReadableStreamInterface;
1111

1212
final class RequestBodyBufferMiddleware
@@ -29,19 +29,19 @@ public function __construct($sizeLimit = null)
2929
$this->sizeLimit = IniUtil::iniSizeToBytes($sizeLimit);
3030
}
3131

32-
public function __invoke(ServerRequestInterface $request, $stack)
32+
public function __invoke(ServerRequestInterface $request, $next)
3333
{
3434
$body = $request->getBody();
3535
$size = $body->getSize();
3636

3737
// happy path: skip if body is known to be empty (or is already buffered)
38-
if ($size === 0 || !$body instanceof ReadableStreamInterface) {
38+
if ($size === 0 || !$body instanceof ReadableStreamInterface || !$body->isReadable()) {
3939
// replace with empty body if body is streaming (or buffered size exceeds limit)
4040
if ($body instanceof ReadableStreamInterface || $size > $this->sizeLimit) {
4141
$request = $request->withBody(new BufferedBody(''));
4242
}
4343

44-
return $stack($request);
44+
return $next($request);
4545
}
4646

4747
// request body of known size exceeding limit
@@ -50,21 +50,60 @@ public function __invoke(ServerRequestInterface $request, $stack)
5050
$sizeLimit = 0;
5151
}
5252

53-
return Stream\buffer($body, $sizeLimit)->then(function ($buffer) use ($request, $stack) {
54-
$request = $request->withBody(new BufferedBody($buffer));
55-
56-
return $stack($request);
57-
}, function ($error) use ($stack, $request, $body) {
58-
// On buffer overflow keep the request body stream in,
59-
// but ignore the contents and wait for the close event
60-
// before passing the request on to the next middleware.
61-
if ($error instanceof OverflowException) {
62-
return Stream\first($body, 'close')->then(function () use ($stack, $request) {
63-
return $stack($request);
64-
});
65-
}
53+
/** @var ?\Closure $closer */
54+
$closer = null;
55+
56+
return new Promise(function ($resolve, $reject) use ($body, &$closer, $sizeLimit, $request, $next) {
57+
// buffer request body data in memory, discard but keep buffering if limit is reached
58+
$buffer = '';
59+
$bufferer = null;
60+
$body->on('data', $bufferer = function ($data) use (&$buffer, $sizeLimit, $body, &$bufferer) {
61+
$buffer .= $data;
62+
63+
// On buffer overflow keep the request body stream in,
64+
// but ignore the contents and wait for the close event
65+
// before passing the request on to the next middleware.
66+
if (isset($buffer[$sizeLimit])) {
67+
assert($bufferer instanceof \Closure);
68+
$body->removeListener('data', $bufferer);
69+
$bufferer = null;
70+
$buffer = '';
71+
}
72+
});
73+
74+
// call $next with current buffer and resolve or reject with its results
75+
$body->on('close', $closer = function () use (&$buffer, $request, $resolve, $reject, $next) {
76+
try {
77+
// resolve with result of next handler
78+
$resolve($next($request->withBody(new BufferedBody($buffer))));
79+
} catch (\Exception $e) {
80+
$reject($e);
81+
} catch (\Throwable $e) { // @codeCoverageIgnoreStart
82+
// reject Errors just like Exceptions (PHP 7+)
83+
$reject($e); // @codeCoverageIgnoreEnd
84+
}
85+
});
86+
87+
// reject buffering if body emits error
88+
$body->on('error', function (\Exception $e) use ($reject, $body, $closer) {
89+
// remove close handler to avoid resolving, then close and reject
90+
assert($closer instanceof \Closure);
91+
$body->removeListener('close', $closer);
92+
$body->close();
93+
94+
$reject(new \RuntimeException(
95+
'Error while buffering request body: ' . $e->getMessage(),
96+
$e->getCode(),
97+
$e
98+
));
99+
});
100+
}, function () use ($body, &$closer) {
101+
// cancelled buffering: remove close handler to avoid resolving, then close and reject
102+
assert($closer instanceof \Closure);
103+
$body->removeListener('close', $closer);
104+
$body->close();
66105

67-
throw $error;
106+
throw new \RuntimeException('Cancelled buffering request body');
68107
});
69108
}
70109
}

tests/Io/TransactionTest.php

Lines changed: 89 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -406,7 +406,7 @@ public function testReceivingStreamingBodyWillResolveWithBufferedResponseByDefau
406406
$this->assertEquals('hello world', (string)$response->getBody());
407407
}
408408

409-
public function testReceivingStreamingBodyWithSizeExceedingMaximumResponseBufferWillRejectAndCloseResponseStream()
409+
public function testReceivingStreamingBodyWithContentLengthExceedingMaximumResponseBufferWillRejectAndCloseResponseStreamImmediately()
410410
{
411411
$stream = new ThroughStream();
412412
$stream->on('close', $this->expectCallableOnce());
@@ -419,11 +419,87 @@ public function testReceivingStreamingBodyWithSizeExceedingMaximumResponseBuffer
419419
$sender = $this->makeSenderMock();
420420
$sender->expects($this->once())->method('send')->with($this->equalTo($request))->willReturn(Promise\resolve($response));
421421

422+
$transaction = new Transaction($sender, Loop::get());
423+
424+
$promise = $transaction->send($request);
425+
426+
$exception = null;
427+
$promise->then(null, function ($e) use (&$exception) {
428+
$exception = $e;
429+
});
430+
431+
$this->assertFalse($stream->isWritable());
432+
433+
assert($exception instanceof \OverflowException);
434+
$this->assertInstanceOf('OverflowException', $exception);
435+
$this->assertEquals('Response body size of 100000000 bytes exceeds maximum of 16777216 bytes', $exception->getMessage());
436+
$this->assertEquals(defined('SOCKET_EMSGSIZE') ? \SOCKET_EMSGSIZE : 90, $exception->getCode());
437+
$this->assertNull($exception->getPrevious());
438+
}
439+
440+
public function testReceivingStreamingBodyWithContentsExceedingMaximumResponseBufferWillRejectAndCloseResponseStreamWhenBufferExceedsLimit()
441+
{
442+
$stream = new ThroughStream();
443+
$stream->on('close', $this->expectCallableOnce());
444+
445+
$request = $this->getMockBuilder('Psr\Http\Message\RequestInterface')->getMock();
446+
447+
$response = new Response(200, array(), new ReadableBodyStream($stream));
448+
449+
// mock sender to resolve promise with the given $response in response to the given $request
450+
$sender = $this->makeSenderMock();
451+
$sender->expects($this->once())->method('send')->with($this->equalTo($request))->willReturn(Promise\resolve($response));
452+
453+
$transaction = new Transaction($sender, Loop::get());
454+
$transaction = $transaction->withOptions(array('maximumSize' => 10));
455+
$promise = $transaction->send($request);
456+
457+
$exception = null;
458+
$promise->then(null, function ($e) use (&$exception) {
459+
$exception = $e;
460+
});
461+
462+
$this->assertTrue($stream->isWritable());
463+
$stream->write('hello wörld');
464+
$this->assertFalse($stream->isWritable());
465+
466+
assert($exception instanceof \OverflowException);
467+
$this->assertInstanceOf('OverflowException', $exception);
468+
$this->assertEquals('Response body size exceeds maximum of 10 bytes', $exception->getMessage());
469+
$this->assertEquals(defined('SOCKET_EMSGSIZE') ? \SOCKET_EMSGSIZE : 90, $exception->getCode());
470+
$this->assertNull($exception->getPrevious());
471+
}
472+
473+
public function testReceivingStreamingBodyWillRejectWhenStreamEmitsError()
474+
{
475+
$stream = new ThroughStream(function ($data) {
476+
throw new \UnexpectedValueException('Unexpected ' . $data, 42);
477+
});
478+
479+
$request = $this->getMockBuilder('Psr\Http\Message\RequestInterface')->getMock();
480+
$response = new Response(200, array(), new ReadableBodyStream($stream));
481+
482+
// mock sender to resolve promise with the given $response in response to the given $request
483+
$sender = $this->makeSenderMock();
484+
$sender->expects($this->once())->method('send')->with($this->equalTo($request))->willReturn(Promise\resolve($response));
485+
422486
$transaction = new Transaction($sender, Loop::get());
423487
$promise = $transaction->send($request);
424488

425-
$this->setExpectedException('OverflowException');
426-
\React\Async\await(\React\Promise\Timer\timeout($promise, 0.001));
489+
$exception = null;
490+
$promise->then(null, function ($e) use (&$exception) {
491+
$exception = $e;
492+
});
493+
494+
$this->assertTrue($stream->isWritable());
495+
$stream->write('Foo');
496+
$this->assertFalse($stream->isWritable());
497+
498+
assert($exception instanceof \RuntimeException);
499+
$this->assertInstanceOf('RuntimeException', $exception);
500+
$this->assertEquals('Error while buffering response body: Unexpected Foo', $exception->getMessage());
501+
$this->assertEquals(42, $exception->getCode());
502+
$this->assertInstanceOf('UnexpectedValueException', $exception->getPrevious());
427503
}
428504

429505
public function testCancelBufferingResponseWillCloseStreamAndReject()
@@ -446,8 +522,16 @@ public function testCancelBufferingResponseWillCloseStreamAndReject()
446522
$deferred->resolve($response);
447523
$promise->cancel();
448524

449-
$this->setExpectedException('RuntimeException');
450-
\React\Async\await(\React\Promise\Timer\timeout($promise, 0.001));
525+
$exception = null;
526+
$promise->then(null, function ($e) use (&$exception) {
527+
$exception = $e;
528+
});
529+
530+
assert($exception instanceof \RuntimeException);
531+
$this->assertInstanceOf('RuntimeException', $exception);
532+
$this->assertEquals('Cancelled buffering response body', $exception->getMessage());
533+
$this->assertEquals(0, $exception->getCode());
534+
$this->assertNull($exception->getPrevious());
451535
}
452536

453537
public function testReceivingStreamingBodyWillResolveWithStreamingResponseIfStreamingIsEnabled()

0 commit comments

Comments
 (0)