Skip to content

Commit c96a62f

Browse files
artembilanolegz
authored andcommitted
GH-2939: Rely on custom AmqpHeaders.RETRY_COUNT for server retries
Fixes: #2939 The RabbitMQ 4.0 does not deal with client side `x-*` headers. Therefore, an `x-death.count` is not incremented anymore when message is re-published from client back to the broker. * Spring AMQP 3.2 has introduced an `AmqpHeaders.RETRY_COUNT` custom header. Use `messageProperties.incrementRetryCount()` in the `RabbitMessageChannelBinder` when we re-published message back to the broker for server-side retries * Fix docs respectively Resolves #3019
1 parent bf9ea9e commit c96a62f

File tree

2 files changed

+18
-36
lines changed

2 files changed

+18
-36
lines changed

binders/rabbit-binder/spring-cloud-stream-binder-rabbit/src/main/java/org/springframework/cloud/stream/binder/rabbit/RabbitMessageChannelBinder.java

+1
Original file line numberDiff line numberDiff line change
@@ -815,6 +815,7 @@ private MessageProperties adjustMessagePropertiesHeader(Throwable cause, String
815815
messageProperties.setDeliveryMode(
816816
properties.getExtension().getRepublishDeliveyMode());
817817
}
818+
messageProperties.incrementRetryCount();
818819
return messageProperties;
819820
}
820821

docs/modules/ROOT/pages/rabbit/rabbit_dlq.adoc

+17-36
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,6 @@ public class ReRouteDlqApplication {
2828
2929
private static final String PARKING_LOT = ORIGINAL_QUEUE + ".parkingLot";
3030
31-
private static final String X_RETRIES_HEADER = "x-retries";
32-
3331
public static void main(String[] args) throws Exception {
3432
ConfigurableApplicationContext context = SpringApplication.run(ReRouteDlqApplication.class, args);
3533
System.out.println("Press enter to exit");
@@ -42,12 +40,9 @@ public class ReRouteDlqApplication {
4240
4341
@RabbitListener(queues = DLQ)
4442
public void rePublish(Message failedMessage) {
45-
Integer retriesHeader = (Integer) failedMessage.getMessageProperties().getHeaders().get(X_RETRIES_HEADER);
46-
if (retriesHeader == null) {
47-
retriesHeader = Integer.valueOf(0);
48-
}
49-
if (retriesHeader < 3) {
50-
failedMessage.getMessageProperties().getHeaders().put(X_RETRIES_HEADER, retriesHeader + 1);
43+
long retries = failedMessage.getMessageProperties().getRetryCount();
44+
if (retries < 3) {
45+
failedMessage.getMessageProperties().incrementRetryCount();
5146
this.rabbitTemplate.send(ORIGINAL_QUEUE, failedMessage);
5247
}
5348
else {
@@ -74,8 +69,6 @@ public class ReRouteDlqApplication {
7469
7570
private static final String PARKING_LOT = ORIGINAL_QUEUE + ".parkingLot";
7671
77-
private static final String X_RETRIES_HEADER = "x-retries";
78-
7972
private static final String DELAY_EXCHANGE = "dlqReRouter";
8073
8174
public static void main(String[] args) throws Exception {
@@ -90,13 +83,10 @@ public class ReRouteDlqApplication {
9083
9184
@RabbitListener(queues = DLQ)
9285
public void rePublish(Message failedMessage) {
93-
Map<String, Object> headers = failedMessage.getMessageProperties().getHeaders();
94-
Integer retriesHeader = (Integer) headers.get(X_RETRIES_HEADER);
95-
if (retriesHeader == null) {
96-
retriesHeader = Integer.valueOf(0);
97-
}
98-
if (retriesHeader < 3) {
99-
headers.put(X_RETRIES_HEADER, retriesHeader + 1);
86+
long retries = failedMessage.getMessageProperties().getRetryCount();
87+
if (retries < 3) {
88+
failedMessage.getMessageProperties().incrementRetryCount();
89+
Map<String, Object> headers = failedMessage.getMessageProperties().getHeaders();
10090
headers.put("x-delay", 5000 * retriesHeader);
10191
this.rabbitTemplate.send(DELAY_EXCHANGE, ORIGINAL_QUEUE, failedMessage);
10292
}
@@ -128,9 +118,10 @@ public class ReRouteDlqApplication {
128118
[[partitioned-destinations]]
129119
== Partitioned Destinations
130120

131-
With partitioned destinations, there is one DLQ for all partitions. We determine the original queue from the headers.
121+
With partitioned destinations, there is one DLQ for all partitions.
122+
We determine the original queue from the headers.
132123

133-
[[republishtodlq=false]]
124+
[[republishtodlq-false]]
134125
=== `republishToDlq=false`
135126

136127
When `republishToDlq` is `false`, RabbitMQ publishes the message to the DLX/DLQ with an `x-death` header containing information about the original destination, as shown in the following example:
@@ -148,8 +139,6 @@ public class ReRouteDlqApplication {
148139
149140
private static final String X_DEATH_HEADER = "x-death";
150141
151-
private static final String X_RETRIES_HEADER = "x-retries";
152-
153142
public static void main(String[] args) throws Exception {
154143
ConfigurableApplicationContext context = SpringApplication.run(ReRouteDlqApplication.class, args);
155144
System.out.println("Press enter to exit");
@@ -164,12 +153,9 @@ public class ReRouteDlqApplication {
164153
@RabbitListener(queues = DLQ)
165154
public void rePublish(Message failedMessage) {
166155
Map<String, Object> headers = failedMessage.getMessageProperties().getHeaders();
167-
Integer retriesHeader = (Integer) headers.get(X_RETRIES_HEADER);
168-
if (retriesHeader == null) {
169-
retriesHeader = Integer.valueOf(0);
170-
}
171-
if (retriesHeader < 3) {
172-
headers.put(X_RETRIES_HEADER, retriesHeader + 1);
156+
long retries = failedMessage.getMessageProperties().getRetryCount();
157+
if (retries < 3) {
158+
failedMessage.getMessageProperties().incrementRetryCount();
173159
List<Map<String, ?>> xDeath = (List<Map<String, ?>>) headers.get(X_DEATH_HEADER);
174160
String exchange = (String) xDeath.get(0).get("exchange");
175161
List<String> routingKeys = (List<String>) xDeath.get(0).get("routing-keys");
@@ -188,7 +174,7 @@ public class ReRouteDlqApplication {
188174
}
189175
----
190176

191-
[[republishtodlq=true]]
177+
[[republishtodlq-true]]
192178
=== `republishToDlq=true`
193179

194180
When `republishToDlq` is `true`, the republishing recoverer adds the original exchange and routing key to headers, as shown in the following example:
@@ -204,8 +190,6 @@ public class ReRouteDlqApplication {
204190
205191
private static final String PARKING_LOT = ORIGINAL_QUEUE + ".parkingLot";
206192
207-
private static final String X_RETRIES_HEADER = "x-retries";
208-
209193
private static final String X_ORIGINAL_EXCHANGE_HEADER = RepublishMessageRecoverer.X_ORIGINAL_EXCHANGE;
210194
211195
private static final String X_ORIGINAL_ROUTING_KEY_HEADER = RepublishMessageRecoverer.X_ORIGINAL_ROUTING_KEY;
@@ -223,12 +207,9 @@ public class ReRouteDlqApplication {
223207
@RabbitListener(queues = DLQ)
224208
public void rePublish(Message failedMessage) {
225209
Map<String, Object> headers = failedMessage.getMessageProperties().getHeaders();
226-
Integer retriesHeader = (Integer) headers.get(X_RETRIES_HEADER);
227-
if (retriesHeader == null) {
228-
retriesHeader = Integer.valueOf(0);
229-
}
230-
if (retriesHeader < 3) {
231-
headers.put(X_RETRIES_HEADER, retriesHeader + 1);
210+
long retries = failedMessage.getMessageProperties().getRetryCount();
211+
if (retries < 3) {
212+
failedMessage.getMessageProperties().incrementRetryCount();
232213
String exchange = (String) headers.get(X_ORIGINAL_EXCHANGE_HEADER);
233214
String originalRoutingKey = (String) headers.get(X_ORIGINAL_ROUTING_KEY_HEADER);
234215
this.rabbitTemplate.send(exchange, originalRoutingKey, failedMessage);

0 commit comments

Comments
 (0)