39
39
import com .rabbitmq .client .amqp .impl .TestUtils .Sync ;
40
40
import java .util .List ;
41
41
import java .util .Set ;
42
- import java .util .concurrent .*;
42
+ import java .util .concurrent .ArrayBlockingQueue ;
43
+ import java .util .concurrent .BlockingQueue ;
44
+ import java .util .concurrent .ConcurrentHashMap ;
45
+ import java .util .concurrent .CopyOnWriteArrayList ;
46
+ import java .util .concurrent .CountDownLatch ;
47
+ import java .util .concurrent .TimeUnit ;
43
48
import java .util .concurrent .atomic .AtomicInteger ;
44
49
import java .util .stream .IntStream ;
45
50
import org .assertj .core .api .Assertions ;
46
- import org .junit .jupiter .api .*;
51
+ import org .junit .jupiter .api .BeforeEach ;
52
+ import org .junit .jupiter .api .Test ;
53
+ import org .junit .jupiter .api .TestInfo ;
47
54
import org .junit .jupiter .params .ParameterizedTest ;
48
55
import org .junit .jupiter .params .provider .ValueSource ;
49
56
import org .slf4j .Logger ;
@@ -649,21 +656,6 @@ void shouldRecoverEvenIfManagementIsClosed(TestInfo info) {
649
656
Management management = connection .management ();
650
657
Management .QueueInfo queueInfo = management .queue ().exclusive (true ).declare ();
651
658
String q = queueInfo .name ();
652
- Publisher publisher = connection .publisherBuilder ().queue (q ).build ();
653
- Sync publishSync = TestUtils .sync ();
654
- Publisher .Callback callback =
655
- ctx -> {
656
- if (ctx .status () == ACCEPTED ) {
657
- publishSync .down ();
658
- } else {
659
- LOGGER .warn (
660
- "Unexpected status: {} ({})" , ctx .status (), info .getTestMethod ().get ().getName ());
661
- }
662
- };
663
- publisher .publish (publisher .message (), callback );
664
- assertThat (publishSync ).completes ();
665
-
666
- waitAtMost (() -> management .queueInfo (q ).messageCount () == 1 );
667
659
668
660
Sync consumeSync = TestUtils .sync ();
669
661
connection
@@ -676,16 +668,28 @@ void shouldRecoverEvenIfManagementIsClosed(TestInfo info) {
676
668
})
677
669
.build ();
678
670
679
- assertThat (consumeSync ).completes ();
680
- waitAtMost (() -> management .queueInfo (q ).messageCount () == 0 );
681
- management .close ();
671
+ waitAtMost (() -> management .queueInfo (q ).consumerCount () == 1 );
682
672
683
- publishSync .reset ();
684
- consumeSync .reset ();
673
+ management .close ();
685
674
686
675
closeConnectionAndWaitForRecovery ();
676
+
677
+ Publisher publisher = connection .publisherBuilder ().queue (q ).build ();
678
+ Sync publishSync = TestUtils .sync ();
679
+ Publisher .Callback callback =
680
+ ctx -> {
681
+ if (ctx .status () == ACCEPTED ) {
682
+ publishSync .down ();
683
+ } else {
684
+ LOGGER .warn (
685
+ "Unexpected status: {} ({})" , ctx .status (), info .getTestMethod ().get ().getName ());
686
+ }
687
+ };
687
688
publisher .publish (publisher .message (), callback );
689
+ assertThat (publishSync ).completes ();
690
+
688
691
assertThat (consumeSync ).completes ();
692
+
689
693
assertThatThrownBy (() -> management .queueInfo (q ))
690
694
.isInstanceOf (AmqpResourceClosedException .class );
691
695
assertThat (connection .management ().queueInfo (q )).isEmpty ();
0 commit comments