148
148
*/
149
149
public class RabbitTemplate extends RabbitAccessor // NOSONAR type line count
150
150
implements BeanFactoryAware , RabbitOperations , MessageListener ,
151
- ListenerContainerAware , PublisherCallbackChannel .Listener , Lifecycle , BeanNameAware {
151
+ ListenerContainerAware , PublisherCallbackChannel .Listener , Lifecycle , BeanNameAware {
152
152
153
153
private static final String UNCHECKED = "unchecked" ;
154
154
@@ -894,9 +894,9 @@ protected void doStart() {
894
894
public final void stop () {
895
895
synchronized (this .directReplyToContainers ) {
896
896
this .directReplyToContainers .values ()
897
- .stream ()
898
- .filter (AbstractMessageListenerContainer ::isRunning )
899
- .forEach (AbstractMessageListenerContainer ::stop );
897
+ .stream ()
898
+ .filter (AbstractMessageListenerContainer ::isRunning )
899
+ .forEach (AbstractMessageListenerContainer ::stop );
900
900
this .directReplyToContainers .clear ();
901
901
}
902
902
doStop ();
@@ -958,15 +958,13 @@ protected boolean useDirectReplyTo() {
958
958
while (cause != null && !(cause instanceof ShutdownSignalException )) {
959
959
cause = cause .getCause ();
960
960
}
961
- if (cause instanceof ShutdownSignalException ) {
962
- if (RabbitUtils .isPassiveDeclarationChannelClose ((ShutdownSignalException ) cause )) {
963
- if (logger .isWarnEnabled ()) {
964
- logger .warn ("Broker does not support fast replies via 'amq.rabbitmq.reply-to', temporary "
965
- + "queues will be used: " + cause .getMessage () + "." );
966
- }
967
- this .replyAddress = null ;
968
- return false ;
961
+ if (cause != null && RabbitUtils .isPassiveDeclarationChannelClose ((ShutdownSignalException ) cause )) {
962
+ if (logger .isWarnEnabled ()) {
963
+ logger .warn ("Broker does not support fast replies via 'amq.rabbitmq.reply-to', temporary "
964
+ + "queues will be used: " + cause .getMessage () + "." );
969
965
}
966
+ this .replyAddress = null ;
967
+ return false ;
970
968
}
971
969
if (logger .isDebugEnabled ()) {
972
970
logger .debug ("IO error, deferring directReplyTo detection: " + ex .toString ());
@@ -1001,7 +999,7 @@ public void send(final String exchange, final String routingKey,
1001
999
(RabbitTemplate .this .returnCallback != null
1002
1000
|| (correlationData != null && StringUtils .hasText (correlationData .getId ())))
1003
1001
&& RabbitTemplate .this .mandatoryExpression .getValue (
1004
- RabbitTemplate .this .evaluationContext , message , Boolean .class ),
1002
+ RabbitTemplate .this .evaluationContext , message , Boolean .class ),
1005
1003
correlationData );
1006
1004
return null ;
1007
1005
}, obtainTargetConnectionFactory (this .sendConnectionFactorySelectorExpression , message ));
@@ -1262,21 +1260,24 @@ public <R, S> boolean receiveAndReply(final String queueName, ReceiveAndReplyCal
1262
1260
}
1263
1261
1264
1262
@ Override
1265
- public <R , S > boolean receiveAndReply (ReceiveAndReplyCallback <R , S > callback , final String exchange , final String routingKey )
1266
- throws AmqpException {
1263
+ public <R , S > boolean receiveAndReply (ReceiveAndReplyCallback <R , S > callback , final String exchange ,
1264
+ final String routingKey ) throws AmqpException {
1265
+
1267
1266
return receiveAndReply (this .getRequiredQueue (), callback , exchange , routingKey );
1268
1267
}
1269
1268
1270
1269
@ Override
1271
- public <R , S > boolean receiveAndReply (final String queueName , ReceiveAndReplyCallback <R , S > callback , final String replyExchange ,
1272
- final String replyRoutingKey ) throws AmqpException {
1270
+ public <R , S > boolean receiveAndReply (final String queueName , ReceiveAndReplyCallback <R , S > callback ,
1271
+ final String replyExchange , final String replyRoutingKey ) throws AmqpException {
1272
+
1273
1273
return receiveAndReply (queueName , callback ,
1274
1274
(request , reply ) -> new Address (replyExchange , replyRoutingKey ));
1275
1275
}
1276
1276
1277
1277
@ Override
1278
- public <R , S > boolean receiveAndReply (ReceiveAndReplyCallback <R , S > callback , ReplyToAddressCallback <S > replyToAddressCallback )
1279
- throws AmqpException {
1278
+ public <R , S > boolean receiveAndReply (ReceiveAndReplyCallback <R , S > callback ,
1279
+ ReplyToAddressCallback <S > replyToAddressCallback ) throws AmqpException {
1280
+
1280
1281
return receiveAndReply (this .getRequiredQueue (), callback , replyToAddressCallback );
1281
1282
}
1282
1283
@@ -1290,18 +1291,17 @@ private <R, S> boolean doReceiveAndReply(final String queueName, final ReceiveAn
1290
1291
final ReplyToAddressCallback <S > replyToAddressCallback ) throws AmqpException {
1291
1292
1292
1293
Boolean result = execute (channel -> {
1293
- Message receiveMessage = receiveForReply (queueName , channel );
1294
- if (receiveMessage != null ) {
1295
- return sendReply (callback , replyToAddressCallback , channel , receiveMessage );
1296
- }
1297
- return false ;
1298
- }, obtainTargetConnectionFactory (this .receiveConnectionFactorySelectorExpression , queueName ));
1294
+ Message receiveMessage = receiveForReply (queueName , channel );
1295
+ if (receiveMessage != null ) {
1296
+ return sendReply (callback , replyToAddressCallback , channel , receiveMessage );
1297
+ }
1298
+ return false ;
1299
+ }, obtainTargetConnectionFactory (this .receiveConnectionFactorySelectorExpression , queueName ));
1299
1300
return result == null ? false : result ;
1300
1301
}
1301
1302
1302
1303
@ Nullable
1303
1304
private Message receiveForReply (final String queueName , Channel channel ) throws IOException {
1304
-
1305
1305
boolean channelTransacted = isChannelTransacted ();
1306
1306
boolean channelLocallyTransacted = isChannelLocallyTransacted (channel );
1307
1307
Message receiveMessage = null ;
@@ -1358,7 +1358,7 @@ private Delivery consumeDelivery(Channel channel, String queueName, long timeout
1358
1358
DefaultConsumer consumer = null ;
1359
1359
try {
1360
1360
consumer = createConsumer (queueName , channel , future ,
1361
- timeoutMillis < 0 ? DEFAULT_CONSUME_TIMEOUT : timeoutMillis );
1361
+ timeoutMillis < 0 ? DEFAULT_CONSUME_TIMEOUT : timeoutMillis );
1362
1362
if (timeoutMillis < 0 ) {
1363
1363
delivery = future .get ();
1364
1364
}
@@ -1404,7 +1404,7 @@ else if (logger.isDebugEnabled()) {
1404
1404
@ SuppressWarnings (UNCHECKED )
1405
1405
private <R , S > boolean sendReply (final ReceiveAndReplyCallback <R , S > callback ,
1406
1406
final ReplyToAddressCallback <S > replyToAddressCallback , Channel channel , Message receiveMessage )
1407
- throws IOException {
1407
+ throws IOException {
1408
1408
1409
1409
Object receive = receiveMessage ;
1410
1410
if (!(ReceiveAndReplyMessageCallback .class .isAssignableFrom (callback .getClass ()))) {
@@ -1583,8 +1583,10 @@ public Object convertSendAndReceive(final String routingKey, final Object messag
1583
1583
1584
1584
@ Override
1585
1585
@ Nullable
1586
- public Object convertSendAndReceive (final String routingKey , final Object message , final MessagePostProcessor messagePostProcessor ,
1587
- @ Nullable CorrelationData correlationData ) throws AmqpException {
1586
+ public Object convertSendAndReceive (final String routingKey , final Object message ,
1587
+ final MessagePostProcessor messagePostProcessor , @ Nullable CorrelationData correlationData )
1588
+ throws AmqpException {
1589
+
1588
1590
return convertSendAndReceive (this .exchange , routingKey , message , messagePostProcessor , correlationData );
1589
1591
}
1590
1592
@@ -1637,7 +1639,7 @@ public <T> T convertSendAndReceiveAsType(final String routingKey, final Object m
1637
1639
@ Nullable
1638
1640
public <T > T convertSendAndReceiveAsType (final String routingKey , final Object message ,
1639
1641
@ Nullable CorrelationData correlationData , ParameterizedTypeReference <T > responseType )
1640
- throws AmqpException {
1642
+ throws AmqpException {
1641
1643
1642
1644
return convertSendAndReceiveAsType (this .exchange , routingKey , message , null , correlationData , responseType );
1643
1645
}
@@ -1664,7 +1666,7 @@ public <T> T convertSendAndReceiveAsType(final Object message,
1664
1666
public <T > T convertSendAndReceiveAsType (final Object message ,
1665
1667
@ Nullable final MessagePostProcessor messagePostProcessor ,
1666
1668
@ Nullable CorrelationData correlationData , ParameterizedTypeReference <T > responseType )
1667
- throws AmqpException {
1669
+ throws AmqpException {
1668
1670
1669
1671
return convertSendAndReceiveAsType (this .exchange , this .routingKey , message , messagePostProcessor ,
1670
1672
correlationData , responseType );
@@ -1674,7 +1676,7 @@ public <T> T convertSendAndReceiveAsType(final Object message,
1674
1676
@ Nullable
1675
1677
public <T > T convertSendAndReceiveAsType (final String routingKey , final Object message ,
1676
1678
@ Nullable final MessagePostProcessor messagePostProcessor , ParameterizedTypeReference <T > responseType )
1677
- throws AmqpException {
1679
+ throws AmqpException {
1678
1680
1679
1681
return convertSendAndReceiveAsType (routingKey , message , messagePostProcessor , null , responseType );
1680
1682
}
@@ -1684,6 +1686,7 @@ public <T> T convertSendAndReceiveAsType(final String routingKey, final Object m
1684
1686
public <T > T convertSendAndReceiveAsType (final String routingKey , final Object message ,
1685
1687
@ Nullable final MessagePostProcessor messagePostProcessor , @ Nullable CorrelationData correlationData ,
1686
1688
ParameterizedTypeReference <T > responseType ) throws AmqpException {
1689
+
1687
1690
return convertSendAndReceiveAsType (this .exchange , routingKey , message , messagePostProcessor , correlationData ,
1688
1691
responseType );
1689
1692
}
@@ -1693,6 +1696,7 @@ public <T> T convertSendAndReceiveAsType(final String routingKey, final Object m
1693
1696
public <T > T convertSendAndReceiveAsType (final String exchange , final String routingKey , final Object message ,
1694
1697
final MessagePostProcessor messagePostProcessor , ParameterizedTypeReference <T > responseType )
1695
1698
throws AmqpException {
1699
+
1696
1700
return convertSendAndReceiveAsType (exchange , routingKey , message , messagePostProcessor , null , responseType );
1697
1701
}
1698
1702
@@ -1754,6 +1758,7 @@ protected Message convertMessageIfNecessary(final Object object) {
1754
1758
@ Nullable
1755
1759
protected Message doSendAndReceive (final String exchange , final String routingKey , final Message message ,
1756
1760
@ Nullable CorrelationData correlationData ) {
1761
+
1757
1762
if (!this .evaluatedFastReplyTo ) {
1758
1763
synchronized (this ) {
1759
1764
if (!this .evaluatedFastReplyTo ) {
@@ -1776,13 +1781,15 @@ else if (this.replyAddress == null || this.usingFastReplyTo) {
1776
1781
@ Nullable
1777
1782
protected Message doSendAndReceiveWithTemporary (final String exchange , final String routingKey ,
1778
1783
final Message message , final CorrelationData correlationData ) {
1784
+
1779
1785
return execute (channel -> {
1780
1786
final PendingReply pendingReply = new PendingReply ();
1781
1787
String messageTag = String .valueOf (RabbitTemplate .this .messageTagProvider .incrementAndGet ());
1782
1788
RabbitTemplate .this .replyHolder .putIfAbsent (messageTag , pendingReply );
1783
1789
1784
1790
Assert .isNull (message .getMessageProperties ().getReplyTo (),
1785
- "Send-and-receive methods can only be used if the Message does not already have a replyTo property." );
1791
+ "Send-and-receive methods can only be used " +
1792
+ "if the Message does not already have a replyTo property." );
1786
1793
String replyTo ;
1787
1794
if (RabbitTemplate .this .usingFastReplyTo ) {
1788
1795
replyTo = Address .AMQ_RABBITMQ_REPLY_TO ;
@@ -2065,8 +2072,9 @@ private <T> T doExecute(ChannelCallback<T> action, ConnectionFactory connectionF
2065
2072
Connection connection = null ; // NOSONAR (close)
2066
2073
if (channel == null ) {
2067
2074
if (isChannelTransacted ()) {
2068
- resourceHolder = ConnectionFactoryUtils .
2069
- getTransactionalResourceHolder (connectionFactory , true , this .usePublisherConnection );
2075
+ resourceHolder =
2076
+ ConnectionFactoryUtils .getTransactionalResourceHolder (connectionFactory ,
2077
+ true , this .usePublisherConnection );
2070
2078
channel = resourceHolder .getChannel ();
2071
2079
if (channel == null ) {
2072
2080
ConnectionFactoryUtils .releaseResources (resourceHolder );
@@ -2266,8 +2274,7 @@ public void determineConfirmsReturnsCapability(ConnectionFactory connectionFacto
2266
2274
* @throws IOException If thrown by RabbitMQ API methods.
2267
2275
*/
2268
2276
public void doSend (Channel channel , String exchangeArg , String routingKeyArg , Message message , // NOSONAR complexity
2269
- boolean mandatory , @ Nullable CorrelationData correlationData )
2270
- throws IOException {
2277
+ boolean mandatory , @ Nullable CorrelationData correlationData ) throws IOException {
2271
2278
2272
2279
String exch = exchangeArg ;
2273
2280
String rKey = routingKeyArg ;
@@ -2353,7 +2360,8 @@ private Message buildMessageFromDelivery(Delivery delivery) {
2353
2360
}
2354
2361
2355
2362
private Message buildMessageFromResponse (GetResponse response ) {
2356
- return buildMessage (response .getEnvelope (), response .getProps (), response .getBody (), response .getMessageCount ());
2363
+ return buildMessage (response .getEnvelope (), response .getProps (), response .getBody (),
2364
+ response .getMessageCount ());
2357
2365
}
2358
2366
2359
2367
private Message buildMessage (Envelope envelope , BasicProperties properties , byte [] body , int msgCount ) {
@@ -2414,7 +2422,8 @@ private Address getReplyToAddress(Message request) throws AmqpException {
2414
2422
if (this .exchange == null ) {
2415
2423
throw new AmqpException (
2416
2424
"Cannot determine ReplyTo message property value: "
2417
- + "Request message does not contain reply-to property, and no default Exchange was set." );
2425
+ + "Request message does not contain reply-to property, " +
2426
+ "and no default Exchange was set." );
2418
2427
}
2419
2428
replyTo = new Address (this .exchange , this .routingKey );
2420
2429
}
@@ -2448,7 +2457,8 @@ public void addListener(Channel channel) {
2448
2457
@ Override
2449
2458
public void handleConfirm (PendingConfirm pendingConfirm , boolean ack ) {
2450
2459
if (this .confirmCallback != null ) {
2451
- this .confirmCallback .confirm (pendingConfirm .getCorrelationData (), ack , pendingConfirm .getCause ()); // NOSONAR never null
2460
+ this .confirmCallback
2461
+ .confirm (pendingConfirm .getCorrelationData (), ack , pendingConfirm .getCause ()); // NOSONAR never null
2452
2462
}
2453
2463
}
2454
2464
@@ -2569,7 +2579,7 @@ private void restoreProperties(Message message, PendingReply pendingReply) {
2569
2579
2570
2580
private DefaultConsumer createConsumer (final String queueName , Channel channel ,
2571
2581
CompletableFuture <Delivery > future , long timeoutMillis ) throws IOException , TimeoutException ,
2572
- InterruptedException {
2582
+ InterruptedException {
2573
2583
2574
2584
channel .basicQos (1 );
2575
2585
final CountDownLatch latch = new CountDownLatch (1 );
@@ -2717,8 +2727,8 @@ public interface ReturnCallback {
2717
2727
* @param exchange the exchange.
2718
2728
* @param routingKey the routing key.
2719
2729
*/
2720
- void returnedMessage (Message message , int replyCode , String replyText ,
2721
- String exchange , String routingKey );
2730
+ void returnedMessage (Message message , int replyCode , String replyText , String exchange , String routingKey );
2731
+
2722
2732
}
2723
2733
2724
2734
}
0 commit comments