@@ -554,7 +554,7 @@ func TestJetStreamClusterConsumerRedeliveredInfo(t *testing.T) {
554
554
}
555
555
556
556
func TestJetStreamClusterConsumerState (t * testing.T ) {
557
- c := createJetStreamClusterExplicit (t , "R3S" , 3 )
557
+ c := createJetStreamClusterExplicit (t , "R3S" , 5 )
558
558
defer c .shutdown ()
559
559
560
560
s := c .randomServer ()
@@ -578,15 +578,27 @@ func TestJetStreamClusterConsumerState(t *testing.T) {
578
578
}
579
579
}
580
580
581
+ // Make sure we are not connected to any of the stream servers so that we do not do client reconnect
582
+ // when we take out the consumer leader.
583
+ if s .JetStreamIsStreamAssigned ("$G" , "TEST" ) {
584
+ nc .Close ()
585
+ for _ , ns := range c .servers {
586
+ if ! ns .JetStreamIsStreamAssigned ("$G" , "TEST" ) {
587
+ s = ns
588
+ nc , js = jsClientConnect (t , s )
589
+ defer nc .Close ()
590
+ break
591
+ }
592
+ }
593
+ }
594
+
581
595
sub , err := js .PullSubscribe ("foo" , "dlc" )
582
596
if err != nil {
583
597
t .Fatalf ("Unexpected error: %v" , err )
584
598
}
585
599
586
600
// Pull 5 messages and ack.
587
- for i := 0 ; i < 5 ; i ++ {
588
- msgs := fetchMsgs (t , sub , 1 , 5 * time .Second )
589
- m := msgs [0 ]
601
+ for _ , m := range fetchMsgs (t , sub , 5 , 5 * time .Second ) {
590
602
m .Ack ()
591
603
}
592
604
@@ -618,11 +630,10 @@ func TestJetStreamClusterConsumerState(t *testing.T) {
618
630
619
631
// Now make sure we can receive new messages.
620
632
// Pull last 5.
621
- for i := 0 ; i < 5 ; i ++ {
622
- msgs := fetchMsgs (t , sub , 1 , 5 * time .Second )
623
- m := msgs [0 ]
633
+ for _ , m := range fetchMsgs (t , sub , 5 , 5 * time .Second ) {
624
634
m .Ack ()
625
635
}
636
+
626
637
nci , _ = sub .ConsumerInfo ()
627
638
if nci .Delivered .Consumer != 10 || nci .Delivered .Stream != 10 {
628
639
t .Fatalf ("Received bad delivered: %+v" , nci .Delivered )
@@ -1835,6 +1846,9 @@ func TestJetStreamClusterExtendedStreamInfo(t *testing.T) {
1835
1846
return nil
1836
1847
})
1837
1848
1849
+ nc , js = jsClientConnect (t , c .randomServer ())
1850
+ defer nc .Close ()
1851
+
1838
1852
// Now do consumer.
1839
1853
sub , err := js .PullSubscribe ("foo" , "dlc" )
1840
1854
if err != nil {
0 commit comments