@@ -11801,10 +11801,6 @@ func TestJetStreamDomainInPubAck(t *testing.T) {
11801
11801
s , _ := RunServerWithConfig (conf )
11802
11802
defer s .Shutdown ()
11803
11803
11804
- if ! s .JetStreamEnabled () {
11805
- t .Fatalf ("Expected JetStream to be enabled" )
11806
- }
11807
-
11808
11804
config := s .JetStreamConfig ()
11809
11805
if config != nil {
11810
11806
defer removeDir (t , config .StoreDir )
@@ -11814,10 +11810,9 @@ func TestJetStreamDomainInPubAck(t *testing.T) {
11814
11810
defer nc .Close ()
11815
11811
11816
11812
cfg := & nats.StreamConfig {
11817
- Name : "TEST" ,
11818
- Storage : nats .MemoryStorage ,
11819
- Subjects : []string {"foo" },
11820
- MaxConsumers : 1 ,
11813
+ Name : "TEST" ,
11814
+ Storage : nats .MemoryStorage ,
11815
+ Subjects : []string {"foo" },
11821
11816
}
11822
11817
if _ , err := js .AddStream (cfg ); err != nil {
11823
11818
t .Fatalf ("Unexpected error: %v" , err )
@@ -11835,6 +11830,106 @@ func TestJetStreamDomainInPubAck(t *testing.T) {
11835
11830
}
11836
11831
}
11837
11832
11833
+ func TestJetStreamPushConsumerInfo (t * testing.T ) {
11834
+ s := RunBasicJetStreamServer ()
11835
+ defer s .Shutdown ()
11836
+
11837
+ config := s .JetStreamConfig ()
11838
+ if config != nil {
11839
+ defer removeDir (t , config .StoreDir )
11840
+ }
11841
+
11842
+ nc , js := jsClientConnect (t , s )
11843
+ defer nc .Close ()
11844
+
11845
+ cfg := & nats.StreamConfig {
11846
+ Name : "TEST" ,
11847
+ Storage : nats .MemoryStorage ,
11848
+ Subjects : []string {"foo" },
11849
+ }
11850
+ if _ , err := js .AddStream (cfg ); err != nil {
11851
+ t .Fatalf ("Unexpected error: %v" , err )
11852
+ }
11853
+
11854
+ // We want to test extended consumer info for push based consumers.
11855
+ // We need to do these by hand for now.
11856
+ createConsumer := func (name , deliver string ) {
11857
+ t .Helper ()
11858
+ creq := CreateConsumerRequest {
11859
+ Stream : "TEST" ,
11860
+ Config : ConsumerConfig {
11861
+ Durable : name ,
11862
+ DeliverSubject : deliver ,
11863
+ },
11864
+ }
11865
+ req , err := json .Marshal (creq )
11866
+ if err != nil {
11867
+ t .Fatalf ("Unexpected error: %v" , err )
11868
+ }
11869
+ resp , err := nc .Request (fmt .Sprintf (JSApiDurableCreateT , "TEST" , name ), req , time .Second )
11870
+ if err != nil {
11871
+ t .Fatalf ("Unexpected error: %v" , err )
11872
+ }
11873
+ var ccResp JSApiConsumerCreateResponse
11874
+ if err := json .Unmarshal (resp .Data , & ccResp ); err != nil {
11875
+ t .Fatalf ("Unexpected error: %v" , err )
11876
+ }
11877
+ if ccResp .ConsumerInfo == nil || ccResp .Error != nil {
11878
+ t .Fatalf ("Got a bad response %+v" , ccResp )
11879
+ }
11880
+ }
11881
+
11882
+ consumerInfo := func (name string ) * ConsumerInfo {
11883
+ t .Helper ()
11884
+ resp , err := nc .Request (fmt .Sprintf (JSApiConsumerInfoT , "TEST" , name ), nil , time .Second )
11885
+ if err != nil {
11886
+ t .Fatalf ("Unexpected error: %v" , err )
11887
+ }
11888
+ var cinfo JSApiConsumerInfoResponse
11889
+ if err := json .Unmarshal (resp .Data , & cinfo ); err != nil {
11890
+ t .Fatalf ("Unexpected error: %v" , err )
11891
+ }
11892
+ if cinfo .ConsumerInfo == nil || cinfo .Error != nil {
11893
+ t .Fatalf ("Got a bad response %+v" , cinfo )
11894
+ }
11895
+ return cinfo .ConsumerInfo
11896
+ }
11897
+
11898
+ // First create a durable push and make sure we show now active status.
11899
+ createConsumer ("dlc" , "d.X" )
11900
+ if ci := consumerInfo ("dlc" ); ci .Active != nil {
11901
+ t .Fatalf ("Expected active to be nil, got %+v\n " , ci .Active )
11902
+ }
11903
+ // Now bind the deliver subject.
11904
+ sub , _ := nc .SubscribeSync ("d.X" )
11905
+ nc .Flush () // Make sure it registers.
11906
+ // Check that its reported.
11907
+ if ci := consumerInfo ("dlc" ); ci .Active == nil || ci .Active .Subject != "d.X" {
11908
+ t .Fatalf ("Expected active to be set and have subject %q, got %+v\n " , "d.X" , ci .Active )
11909
+ }
11910
+ sub .Unsubscribe ()
11911
+ nc .Flush () // Make sure it registers.
11912
+ if ci := consumerInfo ("dlc" ); ci .Active != nil {
11913
+ t .Fatalf ("Expected active to be nil, got %+v\n " , ci .Active )
11914
+ }
11915
+
11916
+ // Now make sure we have queue groups indictated as needed.
11917
+ createConsumer ("ik" , "d.Z" )
11918
+ // Now bind the deliver subject with a queue group.
11919
+ sub , _ = nc .QueueSubscribeSync ("d.Z" , "g22" )
11920
+ defer sub .Unsubscribe ()
11921
+ nc .Flush () // Make sure it registers.
11922
+ // Check that queue group reported.
11923
+ if ci := consumerInfo ("ik" ); ci .Active == nil || ci .Active .Subject != "d.Z" || ci .Active .Queue != "g22" {
11924
+ t .Fatalf ("Expected active to be set and have subject %q and queue %q, got %+v\n " , "d.Z" , "g22" , ci .Active )
11925
+ }
11926
+ sub .Unsubscribe ()
11927
+ nc .Flush () // Make sure it registers.
11928
+ if ci := consumerInfo ("ik" ); ci .Active != nil {
11929
+ t .Fatalf ("Expected active to be nil, got %+v\n " , ci .Active )
11930
+ }
11931
+ }
11932
+
11838
11933
// Issue #2213
11839
11934
func TestJetStreamDirectConsumersBeingReported (t * testing.T ) {
11840
11935
s := RunBasicJetStreamServer ()
0 commit comments