Skip to content

Commit ab4b100

Browse files
authored
Fix sequence counting (#30)
1 parent d5303e6 commit ab4b100

File tree

5 files changed

+48
-12
lines changed

5 files changed

+48
-12
lines changed

.github/workflows/ci-build.yml

+1-1
Original file line numberDiff line numberDiff line change
@@ -53,4 +53,4 @@ jobs:
5353
token: ${{ secrets.CODECOV_TOKEN }}
5454
file: ./coverage.xml
5555
flags: php
56-
fail_ci_if_error: false
56+
fail_ci_if_error: false

examples/swoole.php

+23
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
use Spiral\Goridge;
6+
use Swoole\Coroutine as Co;
7+
use Swoole\Coroutine\Barrier;
8+
9+
require 'vendor/autoload.php';
10+
11+
Co::set(['hook_flags'=> SWOOLE_HOOK_ALL]);
12+
Co\Run(function () {
13+
$barrier = Barrier::make();
14+
for ($i = 0; $i < 3; $i++) {
15+
go(function () use ($barrier) {
16+
$rpc = new Goridge\RPC\RPC(
17+
Goridge\Relay::create('tcp://127.0.0.1:6001')
18+
);
19+
echo $rpc->call('App.Hi', 'Antony');
20+
});
21+
}
22+
Barrier::wait($barrier);
23+
});

src/RPC/RPC.php

+6-11
Original file line numberDiff line numberDiff line change
@@ -30,11 +30,6 @@ class RPC implements RPCInterface
3030
*/
3131
private ?string $service = null;
3232

33-
/**
34-
* @var positive-int
35-
*/
36-
private static int $seq = 1;
37-
3833
/**
3934
* @param RelayInterface $relay
4035
* @param CodecInterface|null $codec
@@ -76,7 +71,9 @@ public function withCodec(CodecInterface $codec): RPCInterface
7671
*/
7772
public function call(string $method, $payload, $options = null)
7873
{
79-
$this->relay->send($this->packFrame($method, $payload));
74+
$seq = $this->relay->getNextSeq();
75+
76+
$this->relay->send($this->packFrame($method, $payload, $seq));
8077

8178
// wait for the frame confirmation
8279
$frame = $this->relay->waitFrame();
@@ -85,12 +82,10 @@ public function call(string $method, $payload, $options = null)
8582
throw new RPCException('Invalid RPC frame, options missing');
8683
}
8784

88-
if ($frame->options[0] !== self::$seq) {
85+
if ($frame->options[0] !== $seq) {
8986
throw new RPCException('Invalid RPC frame, sequence mismatch');
9087
}
9188

92-
self::$seq++;
93-
9489
return $this->decodeResponse($frame, $options);
9590
}
9691

@@ -163,13 +158,13 @@ private function decodeResponse(Frame $frame, $options = null)
163158
* @param mixed $payload
164159
* @return Frame
165160
*/
166-
private function packFrame(string $method, $payload): Frame
161+
private function packFrame(string $method, $payload, int $seq): Frame
167162
{
168163
if ($this->service !== null) {
169164
$method = $this->service . '.' . \ucfirst($method);
170165
}
171166

172167
$body = $method . $this->codec->encode($payload);
173-
return new Frame($body, [self::$seq, \strlen($method)], $this->codec->getIndex());
168+
return new Frame($body, [$seq, \strlen($method)], $this->codec->getIndex());
174169
}
175170
}

src/Relay.php

+13
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,11 @@ abstract class Relay implements RelayInterface
1313
public const PIPES = 'pipes';
1414
protected const CONNECTION_EXP = '/(?P<protocol>[^:\/]+):\/\/(?P<arg1>[^:]+)(:(?P<arg2>[^:]+))?/';
1515

16+
/**
17+
* @var int
18+
*/
19+
private int $seq = 1;
20+
1621
/**
1722
* Create relay using string address.
1823
*
@@ -93,4 +98,12 @@ private static function openOut(string $output)
9398

9499
return $resource;
95100
}
101+
102+
/**
103+
* @return int
104+
*/
105+
public function getNextSeq(): int
106+
{
107+
return $this->seq++;
108+
}
96109
}

src/RelayInterface.php

+5
Original file line numberDiff line numberDiff line change
@@ -21,4 +21,9 @@ public function waitFrame(): Frame;
2121
* @param Frame $frame
2222
*/
2323
public function send(Frame $frame): void;
24+
25+
/**
26+
* @return int
27+
*/
28+
public function getNextSeq(): int;
2429
}

0 commit comments

Comments
 (0)