55
55
import org .springframework .amqp .core .MessageDeliveryMode ;
56
56
import org .springframework .amqp .core .MessageProperties ;
57
57
import org .springframework .amqp .core .TopicExchange ;
58
- import org .springframework .amqp .rabbit .connection .AbstractConnectionFactory ;
59
58
import org .springframework .amqp .rabbit .connection .CachingConnectionFactory ;
60
59
import org .springframework .amqp .rabbit .connection .RabbitConnectionFactoryBean ;
61
60
import org .springframework .amqp .rabbit .core .DeclareExchangeConnectionListener ;
62
61
import org .springframework .amqp .rabbit .core .RabbitAdmin ;
63
62
import org .springframework .amqp .rabbit .core .RabbitTemplate ;
64
63
import org .springframework .amqp .rabbit .support .LogAppenderUtils ;
64
+ import org .springframework .context .ApplicationContext ;
65
+ import org .springframework .context .event .ContextClosedEvent ;
66
+ import org .springframework .context .support .GenericApplicationContext ;
65
67
import org .springframework .retry .RetryPolicy ;
66
68
import org .springframework .retry .policy .SimpleRetryPolicy ;
67
69
import org .springframework .retry .support .RetryTemplate ;
@@ -308,6 +310,9 @@ protected void doSend(final Event event, LogEvent logEvent, MessageProperties am
308
310
message = postProcessMessageBeforeSend (message , event );
309
311
this .rabbitTemplate .send (this .manager .exchangeName , routingKey , message );
310
312
}
313
+ catch (IllegalStateException e ) {
314
+ getHandler ().error ("Could not send log message " + logEvent .getMessage () + " appender is stopped" );
315
+ }
311
316
catch (AmqpException e ) {
312
317
int retries = event .incrementRetries ();
313
318
if (this .manager .async && retries < this .manager .maxSenderRetries ) {
@@ -334,7 +339,7 @@ public void run() {
334
339
@ Override
335
340
protected boolean stop (long timeout , TimeUnit timeUnit , boolean changeLifeCycleState ) {
336
341
boolean stopped = super .stop (timeout , timeUnit , changeLifeCycleState );
337
- return stopped & this .manager .stop (timeout , timeUnit );
342
+ return this .manager .stop (timeout , timeUnit ) || stopped ;
338
343
}
339
344
340
345
/**
@@ -397,6 +402,8 @@ public int incrementRetries() {
397
402
398
403
protected static class AmqpManager extends AbstractManager {
399
404
405
+ private final ApplicationContext context = new GenericApplicationContext ();
406
+
400
407
/**
401
408
* True to send events on separate threads.
402
409
*/
@@ -440,7 +447,7 @@ protected static class AmqpManager extends AbstractManager {
440
447
/**
441
448
* RabbitMQ ConnectionFactory.
442
449
*/
443
- private AbstractConnectionFactory connectionFactory ;
450
+ private CachingConnectionFactory connectionFactory ;
444
451
445
452
/**
446
453
* RabbitMQ host to connect to.
@@ -537,6 +544,7 @@ private boolean activateOptions() {
537
544
.replaceAll ("%X\\ {applicationId\\ }" , this .applicationId ),
538
545
null , null , null , Charset .forName (this .charset ), false , true , null , null );
539
546
this .connectionFactory = new CachingConnectionFactory (createRabbitConnectionFactory ());
547
+ this .connectionFactory .setApplicationContext (this .context );
540
548
if (this .addresses != null ) {
541
549
this .connectionFactory .setAddresses (this .addresses );
542
550
}
@@ -588,6 +596,7 @@ protected boolean releaseSub(long timeout, TimeUnit timeUnit) {
588
596
this .retryTimer .cancel ();
589
597
this .senderPool .shutdownNow ();
590
598
this .connectionFactory .destroy ();
599
+ this .connectionFactory .onApplicationEvent (new ContextClosedEvent (this .context ));
591
600
try {
592
601
return this .senderPool .awaitTermination (timeout , timeUnit );
593
602
}
0 commit comments