Skip to content

Commit 04c598b

Browse files
garyrussellartembilan
authored andcommitted
GH-1113: Clean up cals to basicCancel
Resolves #1113 Use `RabbitUtils` for all `basicCancel` calls, with try/catch for all exceptions. * Add channel to log message. # Conflicts: # spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/BlockingQueueConsumer.java
1 parent 00ef68c commit 04c598b

File tree

4 files changed

+23
-41
lines changed

4 files changed

+23
-41
lines changed

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/RabbitUtils.java

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ public abstract class RabbitUtils {
8181
*/
8282
public static final int CHANNEL_PROTOCOL_CLASS_ID_20 = 20;
8383

84-
private static final Log logger = LogFactory.getLog(RabbitUtils.class); // NOSONAR - lower case
84+
private static final Log LOGGER = LogFactory.getLog(RabbitUtils.class);
8585

8686
private static final ThreadLocal<Boolean> physicalCloseRequired = new ThreadLocal<>(); // NOSONAR - lower case
8787

@@ -99,7 +99,7 @@ public static void closeConnection(@Nullable Connection connection) {
9999
// empty
100100
}
101101
catch (Exception ex) {
102-
logger.debug("Ignoring Connection exception - assuming already closed: " + ex.getMessage(), ex);
102+
LOGGER.debug("Ignoring Connection exception - assuming already closed: " + ex.getMessage(), ex);
103103
}
104104
}
105105
}
@@ -118,15 +118,15 @@ public static void closeChannel(@Nullable Channel channel) {
118118
// empty
119119
}
120120
catch (IOException ex) {
121-
logger.debug("Could not close RabbitMQ Channel", ex);
121+
LOGGER.debug("Could not close RabbitMQ Channel", ex);
122122
}
123123
catch (ShutdownSignalException sig) {
124124
if (!isNormalShutdown(sig)) {
125-
logger.debug("Unexpected exception on closing RabbitMQ Channel", sig);
125+
LOGGER.debug("Unexpected exception on closing RabbitMQ Channel", sig);
126126
}
127127
}
128128
catch (Exception ex) {
129-
logger.debug("Unexpected exception on closing RabbitMQ Channel", ex);
129+
LOGGER.debug("Unexpected exception on closing RabbitMQ Channel", ex);
130130
}
131131
}
132132
}
@@ -182,18 +182,18 @@ public static void closeMessageConsumer(Channel channel, Collection<String> cons
182182
}
183183
}
184184

185-
private static void cancel(Channel channel, String consumerTag) {
185+
public static void cancel(Channel channel, String consumerTag) {
186186
try {
187187
channel.basicCancel(consumerTag);
188188
}
189-
catch (IOException e) {
190-
if (logger.isDebugEnabled()) {
191-
logger.debug("Error performing 'basicCancel'", e);
189+
catch (AlreadyClosedException e) {
190+
if (LOGGER.isTraceEnabled()) {
191+
LOGGER.trace(channel + " is already closed", e);
192192
}
193193
}
194-
catch (AlreadyClosedException e) {
195-
if (logger.isTraceEnabled()) {
196-
logger.trace(channel + " is already closed");
194+
catch (Exception e) {
195+
if (LOGGER.isDebugEnabled()) {
196+
LOGGER.debug("Error performing 'basicCancel' on " + channel, e);
197197
}
198198
}
199199
}

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/core/RabbitTemplate.java

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1842,14 +1842,7 @@ public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProp
18421842
}
18431843

18441844
private void cancelConsumerQuietly(Channel channel, DefaultConsumer consumer) {
1845-
try {
1846-
channel.basicCancel(consumer.getConsumerTag());
1847-
}
1848-
catch (Exception e) {
1849-
if (this.logger.isDebugEnabled()) {
1850-
this.logger.debug("Failed to cancel consumer: " + consumer, e);
1851-
}
1852-
}
1845+
RabbitUtils.cancel(channel, consumer.getConsumerTag());
18531846
}
18541847

18551848
@Nullable

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/BlockingQueueConsumer.java

Lines changed: 3 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,6 @@
6767
import org.springframework.util.backoff.BackOffExecution;
6868

6969
import com.rabbitmq.client.AMQP;
70-
import com.rabbitmq.client.AlreadyClosedException;
7170
import com.rabbitmq.client.Channel;
7271
import com.rabbitmq.client.DefaultConsumer;
7372
import com.rabbitmq.client.Envelope;
@@ -403,20 +402,8 @@ protected void basicCancel() {
403402
protected void basicCancel(boolean expected) {
404403
this.normalCancel = expected;
405404
getConsumerTags().forEach(consumerTag -> {
406-
try {
407-
if (this.channel.isOpen()) {
408-
this.channel.basicCancel(consumerTag);
409-
}
410-
}
411-
catch (IOException | IllegalStateException e) {
412-
if (logger.isDebugEnabled()) {
413-
logger.debug("Error performing 'basicCancel'", e);
414-
}
415-
}
416-
catch (AlreadyClosedException e) {
417-
if (logger.isTraceEnabled()) {
418-
logger.trace(this.channel + " is already closed");
419-
}
405+
if (this.channel.isOpen()) {
406+
RabbitUtils.cancel(this.channel, consumerTag);
420407
}
421408
});
422409
this.cancelled.set(true);
@@ -911,7 +898,7 @@ public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProp
911898
// Defensive - should never happen
912899
BlockingQueueConsumer.this.queue.clear();
913900
if (!this.canceled) {
914-
getChannel().basicCancel(consumerTag);
901+
RabbitUtils.cancel(channelToClose, consumerTag);
915902
}
916903
try {
917904
channelToClose.close();

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/DirectMessageListenerContainer.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -816,13 +816,15 @@ private void cancelConsumer(SimpleConsumer consumer) {
816816
synchronized (consumer) {
817817
consumer.setCanceled(true);
818818
if (this.messagesPerAck > 1) {
819-
consumer.ackIfNecessary(0L);
819+
try {
820+
consumer.ackIfNecessary(0L);
821+
}
822+
catch (IOException e) {
823+
this.logger.error("Exception while sending delayed ack", e);
824+
}
820825
}
821826
}
822-
consumer.getChannel().basicCancel(consumer.getConsumerTag());
823-
}
824-
catch (IOException e) {
825-
this.logger.error("Failed to cancel consumer: " + consumer, e);
827+
RabbitUtils.cancel(consumer.getChannel(), consumer.getConsumerTag());
826828
}
827829
finally {
828830
this.consumers.remove(consumer);

0 commit comments

Comments
 (0)