Skip to content

Commit af76b70

Browse files
egusevartembilan
authored andcommitted
GH-1014: Add addMdcAsHeaders into appenders
Fixes #1014 **Cherry-pick to 2.1.x, 2.0.x & 1.7.x** GH-1014 minor changes GH-1014 renamed property to addMdcAsHeaders GH-1014 added addMdcAsHeaders into documentation GH-1014 added addMdcAsHeaders into logback appender. added integration test GH-1014 updated documentation GH-1014 minor fix GH-1014 updated documentation GH-1014 minor fix GH-1014 removed this prefix * Made addMdcAsHeaders true by default * Polishing # Conflicts: # spring-rabbit/src/test/java/org/springframework/amqp/rabbit/log4j2/AmqpAppenderTests.java # spring-rabbit/src/test/java/org/springframework/amqp/rabbit/logback/AmqpAppenderIntegrationTests.java # Conflicts: # spring-rabbit/src/main/java/org/springframework/amqp/rabbit/log4j2/AmqpAppender.java # spring-rabbit/src/main/java/org/springframework/amqp/rabbit/logback/AmqpAppender.java # src/reference/asciidoc/logging.adoc # src/reference/asciidoc/whats-new.adoc
1 parent 68a6411 commit af76b70

File tree

7 files changed

+252
-101
lines changed

7 files changed

+252
-101
lines changed

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/log4j2/AmqpAppender.java

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@
8383
* @author Artem Bilan
8484
* @author Dominique Villard
8585
* @author Nicolas Ristock
86+
* @author Eugene Gusev
8687
*
8788
* @since 1.6
8889
*/
@@ -176,7 +177,9 @@ public static AmqpAppender createAppender(
176177
@PluginAttribute("async") boolean async,
177178
@PluginAttribute("charset") String charset,
178179
@PluginAttribute(value = "bufferSize", defaultInt = Integer.MAX_VALUE) int bufferSize,
179-
@PluginElement(BlockingQueueFactory.ELEMENT_TYPE) BlockingQueueFactory<Event> blockingQueueFactory) {
180+
@PluginElement(BlockingQueueFactory.ELEMENT_TYPE) BlockingQueueFactory<Event> blockingQueueFactory,
181+
@PluginAttribute(value = "addMdcAsHeaders", defaultBoolean = true) boolean addMdcAsHeaders) {
182+
180183
if (name == null) {
181184
LOGGER.error("No name for AmqpAppender");
182185
}
@@ -217,6 +220,7 @@ public static AmqpAppender createAppender(
217220
manager.clientConnectionProperties = clientConnectionProperties;
218221
manager.charset = charset;
219222
manager.async = async;
223+
manager.addMdcAsHeaders = addMdcAsHeaders;
220224

221225
BlockingQueue<Event> eventQueue;
222226
if (blockingQueueFactory == null) {
@@ -274,7 +278,7 @@ protected Message postProcessMessageBeforeSend(Message message, Event event) {
274278
return message;
275279
}
276280

277-
private void sendEvent(Event event, Map<?, ?> properties) {
281+
protected void sendEvent(Event event, Map<?, ?> properties) {
278282
LogEvent logEvent = event.getEvent();
279283
String name = logEvent.getLoggerName();
280284
Level level = logEvent.getLevel();
@@ -303,8 +307,10 @@ private void sendEvent(Event event, Map<?, ?> properties) {
303307
amqpProps.setTimestamp(tstamp.getTime());
304308

305309
// Copy properties in from MDC
306-
for (Entry<?, ?> entry : properties.entrySet()) {
307-
amqpProps.setHeader(entry.getKey().toString(), entry.getValue());
310+
if (this.manager.addMdcAsHeaders) {
311+
for (Entry<?, ?> entry : properties.entrySet()) {
312+
amqpProps.setHeader(entry.getKey().toString(), entry.getValue());
313+
}
308314
}
309315
if (logEvent.getSource() != null) {
310316
amqpProps.setHeader(
@@ -314,6 +320,10 @@ private void sendEvent(Event event, Map<?, ?> properties) {
314320
logEvent.getSource().getLineNumber()));
315321
}
316322

323+
doSend(event, logEvent, amqpProps);
324+
}
325+
326+
protected void doSend(Event event, LogEvent logEvent, MessageProperties amqpProps) {
317327
StringBuilder msgBody;
318328
String routingKey;
319329

@@ -583,6 +593,11 @@ protected static class AmqpManager extends AbstractManager {
583593
*/
584594
private String charset = Charset.defaultCharset().name();
585595

596+
/**
597+
* Whether or not add MDC properties into message headers. true by default for backward compatibility
598+
*/
599+
private boolean addMdcAsHeaders = true;
600+
586601
private boolean durable = true;
587602

588603
private MessageDeliveryMode deliveryMode = MessageDeliveryMode.PERSISTENT;

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/logback/AmqpAppender.java

Lines changed: 127 additions & 88 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,7 @@
8989
* @author Stephen Oakey
9090
* @author Dominique Villard
9191
* @author Nicolas Ristock
92+
* @author Eugene Gusev
9293
*
9394
* @since 1.4
9495
*/
@@ -286,6 +287,11 @@ public class AmqpAppender extends AppenderBase<ILoggingEvent> {
286287
*/
287288
private String charset;
288289

290+
/**
291+
* Whether or not add MDC properties into message headers. true by default for backward compatibility
292+
*/
293+
private boolean addMdcAsHeaders = true;
294+
289295
private boolean durable = true;
290296

291297
private MessageDeliveryMode deliveryMode = MessageDeliveryMode.PERSISTENT;
@@ -505,6 +511,14 @@ public void setMaxSenderRetries(int maxSenderRetries) {
505511
this.maxSenderRetries = maxSenderRetries;
506512
}
507513

514+
public boolean isAddMdcAsHeaders() {
515+
return this.addMdcAsHeaders;
516+
}
517+
518+
public void setAddMdcAsHeaders(boolean addMdcAsHeaders) {
519+
this.addMdcAsHeaders = addMdcAsHeaders;
520+
}
521+
508522
public boolean isDurable() {
509523
return this.durable;
510524
}
@@ -750,6 +764,54 @@ else if ("headers".equals(this.exchangeType)) {
750764
}
751765
}
752766

767+
protected MessageProperties prepareMessageProperties(Event event) {
768+
ILoggingEvent logEvent = event.getEvent();
769+
770+
String name = logEvent.getLoggerName();
771+
Level level = logEvent.getLevel();
772+
773+
MessageProperties amqpProps = new MessageProperties();
774+
amqpProps.setDeliveryMode(this.deliveryMode);
775+
amqpProps.setContentType(this.contentType);
776+
if (null != this.contentEncoding) {
777+
amqpProps.setContentEncoding(this.contentEncoding);
778+
}
779+
amqpProps.setHeader(CATEGORY_NAME, name);
780+
amqpProps.setHeader(THREAD_NAME, logEvent.getThreadName());
781+
amqpProps.setHeader(CATEGORY_LEVEL, level.toString());
782+
if (this.generateId) {
783+
amqpProps.setMessageId(UUID.randomUUID().toString());
784+
}
785+
786+
// Set timestamp
787+
Calendar tstamp = Calendar.getInstance();
788+
tstamp.setTimeInMillis(logEvent.getTimeStamp());
789+
amqpProps.setTimestamp(tstamp.getTime());
790+
791+
// Copy properties in from MDC
792+
if (this.addMdcAsHeaders) {
793+
Map<String, String> props = event.getProperties();
794+
Set<Entry<String, String>> entrySet = props.entrySet();
795+
for (Entry<String, String> entry : entrySet) {
796+
amqpProps.setHeader(entry.getKey(), entry.getValue());
797+
}
798+
}
799+
800+
String[] location = this.locationLayout.doLayout(logEvent).split("\\|");
801+
if (!"?".equals(location[0])) {
802+
amqpProps.setHeader(
803+
"location",
804+
String.format("%s.%s()[%s]", location[0], location[1], location[2]));
805+
}
806+
807+
// Set applicationId, if we're using one
808+
if (this.applicationId != null) {
809+
amqpProps.setAppId(this.applicationId);
810+
}
811+
812+
return amqpProps;
813+
}
814+
753815
/**
754816
* Subclasses may modify the final message before sending.
755817
*
@@ -773,101 +835,78 @@ public void run() {
773835
RabbitTemplate rabbitTemplate = new RabbitTemplate(AmqpAppender.this.connectionFactory);
774836
while (true) {
775837
final Event event = AmqpAppender.this.events.take();
776-
ILoggingEvent logEvent = event.getEvent();
777-
778-
String name = logEvent.getLoggerName();
779-
Level level = logEvent.getLevel();
780-
781-
MessageProperties amqpProps = new MessageProperties();
782-
amqpProps.setDeliveryMode(AmqpAppender.this.deliveryMode);
783-
amqpProps.setContentType(AmqpAppender.this.contentType);
784-
if (null != AmqpAppender.this.contentEncoding) {
785-
amqpProps.setContentEncoding(AmqpAppender.this.contentEncoding);
786-
}
787-
amqpProps.setHeader(CATEGORY_NAME, name);
788-
amqpProps.setHeader(THREAD_NAME, logEvent.getThreadName());
789-
amqpProps.setHeader(CATEGORY_LEVEL, level.toString());
790-
if (AmqpAppender.this.generateId) {
791-
amqpProps.setMessageId(UUID.randomUUID().toString());
792-
}
793-
794-
// Set timestamp
795-
Calendar tstamp = Calendar.getInstance();
796-
tstamp.setTimeInMillis(logEvent.getTimeStamp());
797-
amqpProps.setTimestamp(tstamp.getTime());
798-
799-
// Copy properties in from MDC
800-
Map<String, String> props = event.getProperties();
801-
Set<Entry<String, String>> entrySet = props.entrySet();
802-
for (Entry<String, String> entry : entrySet) {
803-
amqpProps.setHeader(entry.getKey(), entry.getValue());
804-
}
805-
String[] location = AmqpAppender.this.locationLayout.doLayout(logEvent).split("\\|");
806-
if (!"?".equals(location[0])) {
807-
amqpProps.setHeader(
808-
"location",
809-
String.format("%s.%s()[%s]", location[0], location[1], location[2]));
810-
}
811-
byte[] msgBody;
812-
String routingKey = AmqpAppender.this.routingKeyLayout.doLayout(logEvent);
813-
// Set applicationId, if we're using one
814-
if (AmqpAppender.this.applicationId != null) {
815-
amqpProps.setAppId(AmqpAppender.this.applicationId);
816-
}
817-
818-
if (AmqpAppender.this.encoder != null && AmqpAppender.this.headerWritten.compareAndSet(false, true)) {
819-
byte[] header = AmqpAppender.this.encoder.headerBytes();
820-
if (header != null && header.length > 0) {
821-
rabbitTemplate.convertAndSend(AmqpAppender.this.exchangeName, routingKey, header, m -> {
822-
if (AmqpAppender.this.applicationId != null) {
823-
m.getMessageProperties().setAppId(AmqpAppender.this.applicationId);
824-
}
825-
return m;
826-
});
827-
}
828-
}
829-
830-
if (AmqpAppender.this.abbreviator != null && logEvent instanceof LoggingEvent) {
831-
((LoggingEvent) logEvent).setLoggerName(AmqpAppender.this.abbreviator.abbreviate(name));
832-
msgBody = encodeMessage(logEvent);
833-
((LoggingEvent) logEvent).setLoggerName(name);
834-
}
835-
else {
836-
msgBody = encodeMessage(logEvent);
837-
}
838-
839-
// Send a message
840-
try {
841-
Message message = new Message(msgBody, amqpProps);
842-
843-
message = postProcessMessageBeforeSend(message, event);
844-
rabbitTemplate.send(AmqpAppender.this.exchangeName, routingKey, message);
845-
}
846-
catch (AmqpException e) {
847-
int retries = event.incrementRetries();
848-
if (retries < AmqpAppender.this.maxSenderRetries) {
849-
// Schedule a retry based on the number of times I've tried to re-send this
850-
AmqpAppender.this.retryTimer.schedule(new TimerTask() {
851-
852-
@Override
853-
public void run() {
854-
AmqpAppender.this.events.add(event);
855-
}
856-
857-
}, (long) (Math.pow(retries, Math.log(retries)) * 1000));
858-
}
859-
else {
860-
addError("Could not send log message " + logEvent.getMessage()
861-
+ " after " + AmqpAppender.this.maxSenderRetries + " retries", e);
862-
}
863-
}
838+
839+
MessageProperties amqpProps = prepareMessageProperties(event);
840+
841+
String routingKey = AmqpAppender.this.routingKeyLayout.doLayout(event.getEvent());
842+
843+
sendOneEncoderPatternMessage(rabbitTemplate, routingKey);
844+
845+
doSend(rabbitTemplate, event, event.getEvent(), name, amqpProps, routingKey);
864846
}
865847
}
866848
catch (InterruptedException e) {
867849
Thread.currentThread().interrupt();
868850
}
869851
}
870852

853+
private void sendOneEncoderPatternMessage(RabbitTemplate rabbitTemplate, String routingKey) {
854+
/*
855+
* If the encoder provides its pattern, send it as an additional one-time message.
856+
*/
857+
if (AmqpAppender.this.encoder != null
858+
&& AmqpAppender.this.headerWritten.compareAndSet(false, true)) {
859+
byte[] header = AmqpAppender.this.encoder.headerBytes();
860+
if (header != null && header.length > 0) {
861+
rabbitTemplate.convertAndSend(AmqpAppender.this.exchangeName, routingKey, header, m -> {
862+
if (AmqpAppender.this.applicationId != null) {
863+
m.getMessageProperties().setAppId(AmqpAppender.this.applicationId);
864+
}
865+
return m;
866+
});
867+
}
868+
}
869+
}
870+
871+
private void doSend(RabbitTemplate rabbitTemplate, final Event event, ILoggingEvent logEvent, String name,
872+
MessageProperties amqpProps, String routingKey) {
873+
byte[] msgBody;
874+
if (AmqpAppender.this.abbreviator != null && logEvent instanceof LoggingEvent) {
875+
((LoggingEvent) logEvent).setLoggerName(AmqpAppender.this.abbreviator.abbreviate(name));
876+
msgBody = encodeMessage(logEvent);
877+
((LoggingEvent) logEvent).setLoggerName(name);
878+
}
879+
else {
880+
msgBody = encodeMessage(logEvent);
881+
}
882+
883+
// Send a message
884+
try {
885+
Message message = new Message(msgBody, amqpProps);
886+
887+
message = postProcessMessageBeforeSend(message, event);
888+
rabbitTemplate.send(AmqpAppender.this.exchangeName, routingKey, message);
889+
}
890+
catch (AmqpException e) {
891+
int retries = event.incrementRetries();
892+
if (retries < AmqpAppender.this.maxSenderRetries) {
893+
// Schedule a retry based on the number of times I've tried to re-send this
894+
AmqpAppender.this.retryTimer.schedule(new TimerTask() {
895+
896+
@Override
897+
public void run() {
898+
AmqpAppender.this.events.add(event);
899+
}
900+
901+
}, (long) (Math.pow(retries, Math.log(retries)) * 1000)); // NOSONAR magic #
902+
}
903+
else {
904+
addError("Could not send log message " + logEvent.getMessage()
905+
+ " after " + AmqpAppender.this.maxSenderRetries + " retries", e);
906+
}
907+
}
908+
}
909+
871910
private byte[] encodeMessage(ILoggingEvent logEvent) {
872911
if (AmqpAppender.this.encoder != null) {
873912
return AmqpAppender.this.encoder.encode(logEvent);

spring-rabbit/src/test/java/org/springframework/amqp/rabbit/log4j2/AmqpAppenderTests.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@
6767
* @author Artem Bilan
6868
* @author Dominique Villard
6969
* @author Nicolas Ristock
70+
* @author Eugene Gusev
7071
*
7172
* @since 1.6
7273
*/
@@ -162,6 +163,8 @@ public void testProperties() {
162163
assertEquals(5, TestUtils.getPropertyValue(manager, "maxSenderRetries"));
163164
// change the property to true and this fails and test() randomly fails too.
164165
assertFalse(TestUtils.getPropertyValue(manager, "async", Boolean.class));
166+
// default value
167+
assertTrue(TestUtils.getPropertyValue(manager, "addMdcAsHeaders", Boolean.class));
165168

166169
assertEquals(10, TestUtils.getPropertyValue(appender, "events.items", Object[].class).length);
167170

@@ -176,7 +179,11 @@ public void testAmqpAppenderEventQueueTypeDefaultsToLinkedBlockingQueue() throws
176179
Map.class).get("rabbitmq_default_queue");
177180

178181
Object events = TestUtils.getPropertyValue(appender, "events");
179-
assertEquals(LinkedBlockingQueue.class, events.getClass());
182+
183+
Object manager = TestUtils.getPropertyValue(appender, "manager");
184+
assertTrue(TestUtils.getPropertyValue(manager, "addMdcAsHeaders", Boolean.class));
185+
186+
assertThat(events, instanceOf(LinkedBlockingQueue.class));
180187
}
181188

182189
@Test
@@ -192,6 +199,7 @@ public void testUriProperties() {
192199
assertNull(TestUtils.getPropertyValue(manager, "username"));
193200
assertNull(TestUtils.getPropertyValue(manager, "password"));
194201
assertNull(TestUtils.getPropertyValue(manager, "virtualHost"));
202+
assertFalse(TestUtils.getPropertyValue(manager, "addMdcAsHeaders", Boolean.class));
195203
}
196204

197205
@Test

0 commit comments

Comments
 (0)