Skip to content

Commit 98c6abc

Browse files
garyrussellartembilan
authored andcommitted
GH-1113: Clean up cals to basicCancel
Resolves #1113 Use `RabbitUtils` for all `basicCancel` calls, with try/catch for all exceptions. * Add channel to log message.
1 parent ceac6b0 commit 98c6abc

File tree

4 files changed

+23
-41
lines changed

4 files changed

+23
-41
lines changed

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/RabbitUtils.java

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ public abstract class RabbitUtils {
8181
*/
8282
public static final int CHANNEL_PROTOCOL_CLASS_ID_20 = 20;
8383

84-
private static final Log logger = LogFactory.getLog(RabbitUtils.class); // NOSONAR - lower case
84+
private static final Log LOGGER = LogFactory.getLog(RabbitUtils.class);
8585

8686
private static final ThreadLocal<Boolean> physicalCloseRequired = new ThreadLocal<>(); // NOSONAR - lower case
8787

@@ -99,7 +99,7 @@ public static void closeConnection(@Nullable Connection connection) {
9999
// empty
100100
}
101101
catch (Exception ex) {
102-
logger.debug("Ignoring Connection exception - assuming already closed: " + ex.getMessage(), ex);
102+
LOGGER.debug("Ignoring Connection exception - assuming already closed: " + ex.getMessage(), ex);
103103
}
104104
}
105105
}
@@ -118,15 +118,15 @@ public static void closeChannel(@Nullable Channel channel) {
118118
// empty
119119
}
120120
catch (IOException ex) {
121-
logger.debug("Could not close RabbitMQ Channel", ex);
121+
LOGGER.debug("Could not close RabbitMQ Channel", ex);
122122
}
123123
catch (ShutdownSignalException sig) {
124124
if (!isNormalShutdown(sig)) {
125-
logger.debug("Unexpected exception on closing RabbitMQ Channel", sig);
125+
LOGGER.debug("Unexpected exception on closing RabbitMQ Channel", sig);
126126
}
127127
}
128128
catch (Exception ex) {
129-
logger.debug("Unexpected exception on closing RabbitMQ Channel", ex);
129+
LOGGER.debug("Unexpected exception on closing RabbitMQ Channel", ex);
130130
}
131131
}
132132
}
@@ -182,18 +182,18 @@ public static void closeMessageConsumer(Channel channel, Collection<String> cons
182182
}
183183
}
184184

185-
private static void cancel(Channel channel, String consumerTag) {
185+
public static void cancel(Channel channel, String consumerTag) {
186186
try {
187187
channel.basicCancel(consumerTag);
188188
}
189-
catch (IOException e) {
190-
if (logger.isDebugEnabled()) {
191-
logger.debug("Error performing 'basicCancel'", e);
189+
catch (AlreadyClosedException e) {
190+
if (LOGGER.isTraceEnabled()) {
191+
LOGGER.trace(channel + " is already closed", e);
192192
}
193193
}
194-
catch (AlreadyClosedException e) {
195-
if (logger.isTraceEnabled()) {
196-
logger.trace(channel + " is already closed");
194+
catch (Exception e) {
195+
if (LOGGER.isDebugEnabled()) {
196+
LOGGER.debug("Error performing 'basicCancel' on " + channel, e);
197197
}
198198
}
199199
}

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/core/RabbitTemplate.java

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1858,14 +1858,7 @@ public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProp
18581858
}
18591859

18601860
private void cancelConsumerQuietly(Channel channel, DefaultConsumer consumer) {
1861-
try {
1862-
channel.basicCancel(consumer.getConsumerTag());
1863-
}
1864-
catch (Exception e) {
1865-
if (this.logger.isDebugEnabled()) {
1866-
this.logger.debug("Failed to cancel consumer: " + consumer, e);
1867-
}
1868-
}
1861+
RabbitUtils.cancel(channel, consumer.getConsumerTag());
18691862
}
18701863

18711864
@Nullable

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/BlockingQueueConsumer.java

Lines changed: 3 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,6 @@
6969
import org.springframework.util.backoff.BackOffExecution;
7070

7171
import com.rabbitmq.client.AMQP;
72-
import com.rabbitmq.client.AlreadyClosedException;
7372
import com.rabbitmq.client.Channel;
7473
import com.rabbitmq.client.DefaultConsumer;
7574
import com.rabbitmq.client.Envelope;
@@ -405,20 +404,8 @@ protected void basicCancel() {
405404
protected void basicCancel(boolean expected) {
406405
this.normalCancel = expected;
407406
getConsumerTags().forEach(consumerTag -> {
408-
try {
409-
if (this.channel.isOpen()) {
410-
this.channel.basicCancel(consumerTag);
411-
}
412-
}
413-
catch (IOException | IllegalStateException e) {
414-
if (logger.isDebugEnabled()) {
415-
logger.debug("Error performing 'basicCancel'", e);
416-
}
417-
}
418-
catch (AlreadyClosedException e) {
419-
if (logger.isTraceEnabled()) {
420-
logger.trace(this.channel + " is already closed");
421-
}
407+
if (this.channel.isOpen()) {
408+
RabbitUtils.cancel(this.channel, consumerTag);
422409
}
423410
});
424411
this.cancelled.set(true);
@@ -913,7 +900,7 @@ public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProp
913900
// Defensive - should never happen
914901
BlockingQueueConsumer.this.queue.clear();
915902
if (!this.canceled) {
916-
channelToClose.basicCancel(consumerTag);
903+
RabbitUtils.cancel(channelToClose, consumerTag);
917904
}
918905
try {
919906
channelToClose.close();

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/DirectMessageListenerContainer.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -819,13 +819,15 @@ private void cancelConsumer(SimpleConsumer consumer) {
819819
synchronized (consumer) {
820820
consumer.setCanceled(true);
821821
if (this.messagesPerAck > 1) {
822-
consumer.ackIfNecessary(0L);
822+
try {
823+
consumer.ackIfNecessary(0L);
824+
}
825+
catch (IOException e) {
826+
this.logger.error("Exception while sending delayed ack", e);
827+
}
823828
}
824829
}
825-
consumer.getChannel().basicCancel(consumer.getConsumerTag());
826-
}
827-
catch (IOException e) {
828-
this.logger.error("Failed to cancel consumer: " + consumer, e);
830+
RabbitUtils.cancel(consumer.getChannel(), consumer.getConsumerTag());
829831
}
830832
finally {
831833
this.consumers.remove(consumer);

0 commit comments

Comments
 (0)