Skip to content

Commit 9c0a1a6

Browse files
authored
Merge pull request #154 from afdolriski/retry-sqs-fifo-job
Retry sqs fifo job
2 parents 1796c69 + 463c93f commit 9c0a1a6

File tree

4 files changed

+161
-3
lines changed

4 files changed

+161
-3
lines changed

.gitattributes

+1
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
/.github export-ignore
44
/examples export-ignore
5+
/tests export-ignore
56
.codespellrc export-ignore
67
.editorconfig export-ignore
78
.gitattributes export-ignore

composer.json

+1
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
"nunomaduro/larastan": "^2.2",
2828
"orchestra/testbench": "^7.13 || ^9.0 || ^10.0",
2929
"php-parallel-lint/php-parallel-lint": "^1.3",
30+
"phpunit/phpunit": "^9.5",
3031
"squizlabs/php_codesniffer": "^3.7"
3132
},
3233
"minimum-stability": "dev",

src/Queue/SqsJob.php

+23-3
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
namespace Bref\LaravelBridge\Queue;
44

55
use Illuminate\Queue\Jobs\SqsJob as LaravelSqsJob;
6+
use Illuminate\Support\Str;
67

78
class SqsJob extends LaravelSqsJob
89
{
@@ -21,11 +22,19 @@ public function release($delay = 0)
2122
'ReceiptHandle' => $this->job['ReceiptHandle'],
2223
]);
2324

24-
$this->sqs->sendMessage([
25+
$sqsMessage = [
2526
'QueueUrl' => $this->queue,
2627
'MessageBody' => json_encode($payload),
27-
'DelaySeconds' => $this->secondsUntil($delay),
28-
]);
28+
'DelaySeconds' => $this->secondsUntil($delay)
29+
];
30+
31+
if (Str::endsWith($this->queue, '.fifo')) {
32+
$sqsMessage['MessageGroupId'] = $this->job['Attributes']['MessageGroupId'];
33+
$sqsMessage['MessageDeduplicationId'] = $this->parseDeduplicationId($payload['attempts']);
34+
unset($sqsMessage["DelaySeconds"]);
35+
}
36+
37+
$this->sqs->sendMessage($sqsMessage);
2938
}
3039

3140
/**
@@ -35,4 +44,15 @@ public function attempts()
3544
{
3645
return ($this->payload()['attempts'] ?? 0) + 1;
3746
}
47+
48+
/**
49+
* Create new MessageDeduplicationId
50+
* appending attempt at the end so the message will not be ignored
51+
*
52+
* https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_SendMessage.html#API_SendMessage_RequestSyntax
53+
*/
54+
private function parseDeduplicationId($attempts)
55+
{
56+
return $this->job['Attributes']['MessageDeduplicationId'] . '-' . $attempts;
57+
}
3858
}

tests/Queue/SqsJobTest.php

+136
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,136 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
use Bref\LaravelBridge\Queue\SqsJob;
6+
use Aws\Sqs\SqsClient;
7+
use Illuminate\Container\Container;
8+
use Mockery as m;
9+
use PHPUnit\Framework\TestCase;
10+
11+
class SqsJobTest extends TestCase
12+
{
13+
protected $account;
14+
protected $queueName;
15+
protected $baseUrl;
16+
protected $releaseDelay;
17+
protected $queueUrl;
18+
protected $mockedSqsClient;
19+
protected $mockedContainer;
20+
protected $mockedJob;
21+
protected $mockedData;
22+
protected $mockedPayload;
23+
protected $mockedMessageId;
24+
protected $mockedReceiptHandle;
25+
protected $mockedJobData;
26+
27+
protected function setUp(): void
28+
{
29+
$this->account = '1234567891011';
30+
$this->queueName = 'emails';
31+
$this->baseUrl = 'https://sqs.someregion.amazonaws.com';
32+
$this->releaseDelay = 0;
33+
34+
// This is how the modified getQueue builds the queueUrl
35+
$this->queueUrl = $this->baseUrl.'/'.$this->account.'/'.$this->queueName;
36+
37+
// Get a mock of the SqsClient
38+
$this->mockedSqsClient = m::mock(SqsClient::class);
39+
40+
// Use Mockery to mock the IoC Container
41+
$this->mockedContainer = m::mock(Container::class);
42+
43+
$this->mockedJob = 'foo';
44+
$this->mockedData = ['data'];
45+
$this->mockedPayload = json_encode(['job' => $this->mockedJob, 'data' => $this->mockedData, 'attempts' => 1]);
46+
$this->mockedMessageId = 'e3cd03ee-59a3-4ad8-b0aa-ee2e3808ac81';
47+
$this->mockedReceiptHandle = '0NNAq8PwvXuWv5gMtS9DJ8qEdyiUwbAjpp45w2m6M4SJ1Y+PxCh7R930NRB8ylSacEmoSnW18bgd4nK\/O6ctE+VFVul4eD23mA07vVoSnPI4F\/voI1eNCp6Iax0ktGmhlNVzBwaZHEr91BRtqTRM3QKd2ASF8u+IQaSwyl\/DGK+P1+dqUOodvOVtExJwdyDLy1glZVgm85Yw9Jf5yZEEErqRwzYz\/qSigdvW4sm2l7e4phRol\/+IjMtovOyH\/ukueYdlVbQ4OshQLENhUKe7RNN5i6bE\/e5x9bnPhfj2gbM';
48+
}
49+
50+
protected function tearDown(): void
51+
{
52+
m::close();
53+
}
54+
55+
public function testProperlyReleaseStandardSqs()
56+
{
57+
$job = $this->createJob();
58+
$job->getSqs()
59+
->shouldReceive('deleteMessage')
60+
->with(['QueueUrl' => $this->queueUrl, 'ReceiptHandle' => $this->mockedReceiptHandle])
61+
->once();
62+
$job->getSqs()
63+
->shouldReceive('sendMessage')
64+
->with(
65+
$this->logicalAnd(
66+
$this->arrayHasKey("MessageBody"),
67+
$this->arrayHasKey("QueueUrl"),
68+
),
69+
)
70+
->once();
71+
$job->release($this->releaseDelay);
72+
$this->assertTrue($job->isReleased());
73+
}
74+
75+
public function testProperlyReleaseFifoSqs()
76+
{
77+
$job = $this->createFifoJob();
78+
$job->getSqs()
79+
->shouldReceive('deleteMessage')
80+
->with(['QueueUrl' => $this->queueUrl.'.fifo', 'ReceiptHandle' => $this->mockedReceiptHandle])
81+
->once();
82+
$job->getSqs()
83+
->shouldReceive('sendMessage')
84+
->with(
85+
$this->logicalAnd(
86+
$this->arrayHasKey("MessageBody"),
87+
$this->arrayHasKey("QueueUrl"),
88+
$this->arrayHasKey("MessageGroupId"),
89+
$this->arrayHasKey("MessageDeduplicationId"),
90+
),
91+
)
92+
->once();
93+
$job->release($this->releaseDelay);
94+
$this->assertTrue($job->isReleased());
95+
}
96+
97+
protected function createJob()
98+
{
99+
$jobData = [
100+
'Body' => $this->mockedPayload,
101+
'MD5OfBody' => md5($this->mockedPayload),
102+
'ReceiptHandle' => $this->mockedReceiptHandle,
103+
'MessageId' => $this->mockedMessageId,
104+
'Attributes' => ['ApproximateReceiveCount' => 1],
105+
];
106+
return new SqsJob(
107+
$this->mockedContainer,
108+
$this->mockedSqsClient,
109+
$jobData,
110+
'connection-name',
111+
$this->queueUrl
112+
);
113+
}
114+
115+
protected function createFifoJob()
116+
{
117+
$jobData = [
118+
'Body' => $this->mockedPayload,
119+
'MD5OfBody' => md5($this->mockedPayload),
120+
'ReceiptHandle' => $this->mockedReceiptHandle,
121+
'MessageId' => $this->mockedMessageId,
122+
'Attributes' => [
123+
'ApproximateReceiveCount' => 1,
124+
'MessageGroupId' => 'group1',
125+
'MessageDeduplicationId' => 'deduplication1'
126+
],
127+
];
128+
return new SqsJob(
129+
$this->mockedContainer,
130+
$this->mockedSqsClient,
131+
$jobData,
132+
'connection-name',
133+
$this->queueUrl.'.fifo'
134+
);
135+
}
136+
}

0 commit comments

Comments
 (0)