25
25
26
26
import org .apache .commons .logging .Log ;
27
27
import org .apache .commons .logging .LogFactory ;
28
- import org .junit . jupiter . api . AfterEach ;
29
- import org .junit .jupiter . api . BeforeEach ;
30
- import org .junit .jupiter . api . RepeatedTest ;
31
- import org .junit .jupiter . api . RepetitionInfo ;
32
- import org .junit .jupiter . api . Test ;
28
+ import org .apache . logging . log4j . Level ;
29
+ import org .junit .After ;
30
+ import org .junit .Before ;
31
+ import org .junit .Rule ;
32
+ import org .junit .Test ;
33
33
34
34
import org .springframework .amqp .core .AcknowledgeMode ;
35
35
import org .springframework .amqp .core .Message ;
36
36
import org .springframework .amqp .core .Queue ;
37
37
import org .springframework .amqp .rabbit .connection .CachingConnectionFactory ;
38
38
import org .springframework .amqp .rabbit .connection .ConnectionFactory ;
39
39
import org .springframework .amqp .rabbit .core .RabbitTemplate ;
40
+ import org .springframework .amqp .rabbit .junit .BrokerRunning ;
40
41
import org .springframework .amqp .rabbit .junit .BrokerTestUtils ;
41
- import org .springframework .amqp .rabbit .junit .RabbitAvailable ;
42
+ import org .springframework .amqp .rabbit .junit .LogLevelAdjuster ;
42
43
import org .springframework .amqp .rabbit .listener .adapter .MessageListenerAdapter ;
43
44
import org .springframework .amqp .rabbit .listener .api .ChannelAwareMessageListener ;
44
45
import org .springframework .amqp .rabbit .listener .exception .FatalListenerExecutionException ;
46
+ import org .springframework .amqp .rabbit .test .RepeatProcessor ;
45
47
import org .springframework .beans .factory .DisposableBean ;
48
+ import org .springframework .test .annotation .Repeat ;
46
49
47
50
import com .rabbitmq .client .Channel ;
48
51
54
57
* @author Gary Russell
55
58
*
56
59
*/
57
- @ RabbitAvailable (queues = { "test.queue" , "test.send" })
58
60
public class MessageListenerRecoveryRepeatIntegrationTests {
59
61
60
62
private static Log logger = LogFactory .getLog (MessageListenerRecoveryRepeatIntegrationTests .class );
@@ -75,48 +77,48 @@ public class MessageListenerRecoveryRepeatIntegrationTests {
75
77
76
78
private SimpleMessageListenerContainer container ;
77
79
78
- // @Rule
79
- // public LogLevelAdjuster logLevels = new LogLevelAdjuster(Level.ERROR, RabbitTemplate.class,
80
- // ConditionalRejectingErrorHandler.class,
81
- // SimpleMessageListenerContainer.class, BlockingQueueConsumer.class, MessageListenerRecoveryRepeatIntegrationTests.class);
82
- //
83
- // @Rule
84
- // public BrokerRunning brokerIsRunning = BrokerRunning.isRunningWithEmptyQueues(queue.getName(), sendQueue.getName());
85
- //
86
- // @Rule
87
- // public RepeatProcessor repeatProcessor = new RepeatProcessor();
80
+ @ Rule
81
+ public LogLevelAdjuster logLevels = new LogLevelAdjuster (Level .ERROR , RabbitTemplate .class ,
82
+ ConditionalRejectingErrorHandler .class ,
83
+ SimpleMessageListenerContainer .class , BlockingQueueConsumer .class , MessageListenerRecoveryRepeatIntegrationTests .class );
84
+
85
+ @ Rule
86
+ public BrokerRunning brokerIsRunning = BrokerRunning .isRunningWithEmptyQueues (queue .getName (), sendQueue .getName ());
87
+
88
+ @ Rule
89
+ public RepeatProcessor repeatProcessor = new RepeatProcessor ();
88
90
89
91
private CloseConnectionListener listener ;
90
92
91
93
private ConnectionFactory connectionFactory ;
92
94
93
- @ BeforeEach
94
- public void init (RepetitionInfo info ) {
95
- // if (!repeatProcessor.isInitialized()) {
95
+ @ Before
96
+ public void init () {
97
+ if (!repeatProcessor .isInitialized ()) {
96
98
logger .info ("Initializing at start of test" );
97
99
connectionFactory = createConnectionFactory ();
98
100
listener = new CloseConnectionListener ();
99
- // }
101
+ }
100
102
}
101
103
102
- @ AfterEach
104
+ @ After
103
105
public void clear () throws Exception {
104
- // if (repeatProcessor.isFinalizing()) {
106
+ if (repeatProcessor .isFinalizing ()) {
105
107
// Wait for broker communication to finish before trying to stop container
106
- // Thread.sleep(300L);
108
+ Thread .sleep (300L );
107
109
logger .info ("Shutting down at end of test" );
108
110
if (container != null ) {
109
111
container .shutdown ();
110
112
}
111
113
if (connectionFactory != null ) {
112
114
((DisposableBean ) connectionFactory ).destroy ();
113
115
}
114
- // this.brokerIsRunning.removeTestQueues();
115
- // }
116
+ this .brokerIsRunning .removeTestQueues ();
117
+ }
116
118
}
117
119
118
120
@ Test
119
- @ RepeatedTest (1000 )
121
+ @ Repeat (1000 )
120
122
public void testListenerRecoversFromClosedConnection () throws Exception {
121
123
if (this .container == null ) {
122
124
this .container = createContainer (queue .getName (), listener , connectionFactory );
@@ -161,7 +163,6 @@ private SimpleMessageListenerContainer createContainer(String queueName, Object
161
163
container .setChannelTransacted (transactional );
162
164
container .setAcknowledgeMode (acknowledgeMode );
163
165
container .setTaskExecutor (Executors .newFixedThreadPool (concurrentConsumers ));
164
- container .setReceiveTimeout (20L );
165
166
container .afterPropertiesSet ();
166
167
container .start ();
167
168
return container ;
0 commit comments