Skip to content

Commit 6b2f48c

Browse files
garyrussellartembilan
authored andcommitted
AMQP-821: Repub Recoverer limit stack trace header
JIRA: https://jira.spring.io/browse/AMQP-821 Since headers are not fragmented, limit the stack trace header length to prevent failures. **cherry-pick to 2.0.x** # Conflicts: # spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/RabbitUtils.java * Revert AssertJ in the `RepublishMessageRecovererIntegrationTests`
1 parent 3b90050 commit 6b2f48c

File tree

6 files changed

+173
-11
lines changed

6 files changed

+173
-11
lines changed

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/CachingConnectionFactory.java

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -857,10 +857,13 @@ public Properties getPublisherConnectionFactoryCacheProperties() {
857857

858858
private void putConnectionName(Properties props, ConnectionProxy connection, String keySuffix) {
859859
Connection targetConnection = connection.getTargetConnection(); // NOSONAR (close())
860-
if (targetConnection instanceof SimpleConnection) {
861-
String name = ((SimpleConnection) targetConnection).getDelegate().getClientProvidedName();
862-
if (name != null) {
863-
props.put("connectionName" + keySuffix, name);
860+
if (targetConnection != null) {
861+
com.rabbitmq.client.Connection delegate = targetConnection.getDelegate();
862+
if (delegate != null) {
863+
String name = delegate.getClientProvidedName();
864+
if (name != null) {
865+
props.put("connectionName" + keySuffix, name);
866+
}
864867
}
865868
}
866869
}
@@ -1255,6 +1258,11 @@ public Connection getTargetConnection() {
12551258
return this.target;
12561259
}
12571260

1261+
@Override
1262+
public com.rabbitmq.client.Connection getDelegate() {
1263+
return this.target.getDelegate();
1264+
}
1265+
12581266
@Override
12591267
public int getLocalPort() {
12601268
Connection target = this.target; // NOSONAR (close)

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/Connection.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2017 the original author or authors.
2+
* Copyright 2002-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -17,6 +17,7 @@
1717
package org.springframework.amqp.rabbit.connection;
1818

1919
import org.springframework.amqp.AmqpException;
20+
import org.springframework.lang.Nullable;
2021

2122
import com.rabbitmq.client.BlockedListener;
2223
import com.rabbitmq.client.Channel;
@@ -26,7 +27,7 @@
2627
* @author Gary Russell
2728
* @author Artem Bilan
2829
*/
29-
public interface Connection {
30+
public interface Connection extends AutoCloseable {
3031

3132
/**
3233
* Create a new channel, using an internally allocated channel number.
@@ -45,6 +46,7 @@ public interface Connection {
4546
*
4647
* @throws AmqpException if an I/O problem is encountered
4748
*/
49+
@Override
4850
void close() throws AmqpException;
4951

5052
/**
@@ -78,4 +80,12 @@ public interface Connection {
7880
*/
7981
boolean removeBlockedListener(BlockedListener listener);
8082

83+
/**
84+
* Return the underlying RabbitMQ connection.
85+
* @return the connection.
86+
*/
87+
default @Nullable com.rabbitmq.client.Connection getDelegate() {
88+
return null;
89+
}
90+
8191
}

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/RabbitUtils.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -341,4 +341,22 @@ public static boolean shouldRequeue(boolean defaultRequeueRejected, Throwable th
341341
return shouldRequeue;
342342
}
343343

344+
/**
345+
* Return the negotiated frame_max.
346+
* @param connectionFactory the connection factory.
347+
* @return the size or -1 if it cannot be determined.
348+
*/
349+
public static int getMaxFrame(ConnectionFactory connectionFactory) {
350+
try (Connection connection = connectionFactory.createConnection()) {
351+
com.rabbitmq.client.Connection rcon = connection.getDelegate();
352+
if (rcon != null) {
353+
return rcon.getFrameMax();
354+
}
355+
}
356+
catch (RuntimeException e) {
357+
// NOSONAR
358+
}
359+
return -1;
360+
}
361+
344362
}

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/SimpleConnection.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,8 @@ public int getPort() {
139139
return this.delegate.getPort();
140140
}
141141

142-
com.rabbitmq.client.Connection getDelegate() {
142+
@Override
143+
public com.rabbitmq.client.Connection getDelegate() {
143144
return this.delegate;
144145
}
145146

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/retry/RepublishMessageRecoverer.java

Lines changed: 45 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@
2727
import org.springframework.amqp.core.Message;
2828
import org.springframework.amqp.core.MessageDeliveryMode;
2929
import org.springframework.amqp.core.MessageProperties;
30+
import org.springframework.amqp.rabbit.connection.RabbitUtils;
31+
import org.springframework.amqp.rabbit.core.RabbitTemplate;
3032
import org.springframework.util.Assert;
3133

3234
/**
@@ -55,6 +57,8 @@ public class RepublishMessageRecoverer implements MessageRecoverer {
5557

5658
public static final String X_ORIGINAL_ROUTING_KEY = "x-original-routingKey";
5759

60+
public static final int DEFAULT_FRAME_MAX_HEADROOM = 20_000;
61+
5862
protected final Log logger = LogFactory.getLog(getClass());
5963

6064
protected final AmqpTemplate errorTemplate;
@@ -65,6 +69,10 @@ public class RepublishMessageRecoverer implements MessageRecoverer {
6569

6670
private String errorRoutingKeyPrefix = "error.";
6771

72+
private int frameMaxHeadroom = DEFAULT_FRAME_MAX_HEADROOM;
73+
74+
private volatile Integer maxStackTraceLength = -1;
75+
6876
private MessageDeliveryMode deliveryMode = MessageDeliveryMode.PERSISTENT;
6977

7078
public RepublishMessageRecoverer(AmqpTemplate errorTemplate) {
@@ -80,6 +88,9 @@ public RepublishMessageRecoverer(AmqpTemplate errorTemplate, String errorExchang
8088
this.errorTemplate = errorTemplate;
8189
this.errorExchangeName = errorExchange;
8290
this.errorRoutingKey = errorRoutingKey;
91+
if (!(this.errorTemplate instanceof RabbitTemplate)) {
92+
this.maxStackTraceLength = Integer.MAX_VALUE;
93+
}
8394
}
8495

8596
/**
@@ -94,6 +105,19 @@ public RepublishMessageRecoverer errorRoutingKeyPrefix(String errorRoutingKeyPre
94105
return this;
95106
}
96107

108+
/**
109+
* Set the amount by which the negotiated frame_max is to be reduced when considering
110+
* truncating the stack trace header. Defaults to
111+
* {@value #DEFAULT_FRAME_MAX_HEADROOM}.
112+
* @param headroom the headroom
113+
* @return this.
114+
* @since 2.0.5
115+
*/
116+
public RepublishMessageRecoverer frameMaxHeadroom(int headroom) {
117+
this.frameMaxHeadroom = headroom;
118+
return this;
119+
}
120+
97121
/**
98122
* @param errorRoutingKeyPrefix The prefix (default "error.").
99123
* @see #errorRoutingKeyPrefix(String)
@@ -126,7 +150,21 @@ protected MessageDeliveryMode getDeliveryMode() {
126150
public void recover(Message message, Throwable cause) {
127151
MessageProperties messageProperties = message.getMessageProperties();
128152
Map<String, Object> headers = messageProperties.getHeaders();
129-
headers.put(X_EXCEPTION_STACKTRACE, getStackTraceAsString(cause));
153+
String stackTraceAsString = getStackTraceAsString(cause);
154+
if (this.maxStackTraceLength < 0) {
155+
int maxStackTraceLength = RabbitUtils
156+
.getMaxFrame(((RabbitTemplate) this.errorTemplate).getConnectionFactory());
157+
if (maxStackTraceLength > 0) {
158+
maxStackTraceLength -= this.frameMaxHeadroom;
159+
this.maxStackTraceLength = maxStackTraceLength;
160+
}
161+
}
162+
if (this.maxStackTraceLength > 0 && stackTraceAsString.length() > this.maxStackTraceLength) {
163+
stackTraceAsString = stackTraceAsString.substring(0, this.maxStackTraceLength);
164+
this.logger.warn("Stack trace in republished message header truncated due to frame_max limitations; "
165+
+ "consider increasing frame_max on the broker or reduce the stack trace depth", cause);
166+
}
167+
headers.put(X_EXCEPTION_STACKTRACE, stackTraceAsString);
130168
headers.put(X_EXCEPTION_MESSAGE, cause.getCause() != null ? cause.getCause().getMessage() : cause.getMessage());
131169
headers.put(X_ORIGINAL_EXCHANGE, messageProperties.getReceivedExchange());
132170
headers.put(X_ORIGINAL_ROUTING_KEY, messageProperties.getReceivedRoutingKey());
@@ -140,17 +178,20 @@ public void recover(Message message, Throwable cause) {
140178
}
141179

142180
if (null != this.errorExchangeName) {
143-
String routingKey = this.errorRoutingKey != null ? this.errorRoutingKey : this.prefixedOriginalRoutingKey(message);
181+
String routingKey = this.errorRoutingKey != null ? this.errorRoutingKey
182+
: this.prefixedOriginalRoutingKey(message);
144183
this.errorTemplate.send(this.errorExchangeName, routingKey, message);
145184
if (this.logger.isWarnEnabled()) {
146-
this.logger.warn("Republishing failed message to exchange " + this.errorExchangeName);
185+
this.logger.warn("Republishing failed message to exchange '" + this.errorExchangeName
186+
+ "' with routing key " + routingKey);
147187
}
148188
}
149189
else {
150190
final String routingKey = this.prefixedOriginalRoutingKey(message);
151191
this.errorTemplate.send(routingKey, message);
152192
if (this.logger.isWarnEnabled()) {
153-
this.logger.warn("Republishing failed message to the template's default exchange with routing key " + routingKey);
193+
this.logger.warn("Republishing failed message to the template's default exchange with routing key "
194+
+ routingKey);
154195
}
155196
}
156197
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
/*
2+
* Copyright 2018 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.amqp.rabbit.retry;
18+
19+
import static org.hamcrest.Matchers.equalTo;
20+
import static org.hamcrest.Matchers.greaterThan;
21+
import static org.junit.Assert.assertNotNull;
22+
import static org.junit.Assert.assertThat;
23+
24+
import java.io.PrintWriter;
25+
import java.io.StringWriter;
26+
27+
import org.junit.jupiter.api.Test;
28+
29+
import org.springframework.amqp.core.Message;
30+
import org.springframework.amqp.core.MessageProperties;
31+
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
32+
import org.springframework.amqp.rabbit.connection.RabbitUtils;
33+
import org.springframework.amqp.rabbit.core.RabbitTemplate;
34+
import org.springframework.amqp.rabbit.junit.RabbitAvailable;
35+
import org.springframework.amqp.rabbit.junit.RabbitAvailableCondition;
36+
37+
import com.rabbitmq.client.LongString;
38+
39+
/**
40+
* @author Gary Russell
41+
* @author Artem Bilan
42+
*
43+
* @since 2.0.5
44+
*
45+
*/
46+
@RabbitAvailable(queues = RepublishMessageRecovererIntegrationTests.BIG_HEADER_QUEUE)
47+
public class RepublishMessageRecovererIntegrationTests {
48+
49+
public static final String BIG_HEADER_QUEUE = "big.header.queue";
50+
51+
private static final String BIG_EXCEPTION_MESSAGE = new String(new byte[10_000]).replaceAll("\u0000", "x");
52+
53+
private long maxHeaderSize;
54+
55+
@Test
56+
public void testBigHeader() {
57+
RabbitTemplate template = new RabbitTemplate(
58+
new CachingConnectionFactory(RabbitAvailableCondition.getBrokerRunning().getConnectionFactory()));
59+
this.maxHeaderSize = RabbitUtils.getMaxFrame(template.getConnectionFactory()) - 20_000;
60+
assertThat(this.maxHeaderSize, greaterThan(0L));
61+
RepublishMessageRecoverer recoverer = new RepublishMessageRecoverer(template, "", BIG_HEADER_QUEUE);
62+
recoverer.recover(new Message("foo".getBytes(), new MessageProperties()),
63+
bigCause(new RuntimeException(BIG_EXCEPTION_MESSAGE)));
64+
Message received = template.receive(BIG_HEADER_QUEUE, 10_000);
65+
assertNotNull(received);
66+
assertThat(((LongString) received.getMessageProperties().getHeaders()
67+
.get(RepublishMessageRecoverer.X_EXCEPTION_STACKTRACE)).length(), equalTo(this.maxHeaderSize));
68+
}
69+
70+
private Throwable bigCause(Throwable cause) {
71+
if (getStackTraceAsString(cause).length() > this.maxHeaderSize) {
72+
return cause;
73+
}
74+
return bigCause(new RuntimeException(BIG_EXCEPTION_MESSAGE, cause));
75+
}
76+
77+
private String getStackTraceAsString(Throwable cause) {
78+
StringWriter stringWriter = new StringWriter();
79+
PrintWriter printWriter = new PrintWriter(stringWriter, true);
80+
cause.printStackTrace(printWriter);
81+
return stringWriter.getBuffer().toString();
82+
}
83+
84+
}

0 commit comments

Comments
 (0)