Skip to content

Commit a9ab53a

Browse files
committed
Add in deliver status for consumer info
Signed-off-by: Derek Collison <[email protected]>
1 parent da3c89e commit a9ab53a

File tree

2 files changed

+148
-9
lines changed

2 files changed

+148
-9
lines changed

server/consumer.go

+45-1
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ type ConsumerInfo struct {
4343
NumRedelivered int `json:"num_redelivered"`
4444
NumWaiting int `json:"num_waiting"`
4545
NumPending uint64 `json:"num_pending"`
46+
Active *PushActive `json:"active,omitempty"`
4647
Cluster *ClusterInfo `json:"cluster,omitempty"`
4748
}
4849

@@ -69,6 +70,11 @@ type ConsumerConfig struct {
6970
Direct bool `json:"direct,omitempty"`
7071
}
7172

73+
type PushActive struct {
74+
Subject string `json:"subject,omitempty"`
75+
Queue string `json:"queue,omitempty"`
76+
}
77+
7278
type CreateConsumerRequest struct {
7379
Stream string `json:"stream_name"`
7480
Config ConsumerConfig `json:"config"`
@@ -190,6 +196,7 @@ type consumer struct {
190196
asflr uint64
191197
sgap uint64
192198
dsubj string
199+
qgroup string
193200
lss *lastSeqSkipList
194201
rlimit *rate.Limiter
195202
reqSub *subscription
@@ -688,6 +695,27 @@ func (o *consumer) setConsumerAssignment(ca *consumerAssignment) {
688695
}
689696
}
690697

698+
// checkInterest will check on our interest's queue group status.
699+
// Lock should be held.
700+
func (o *consumer) checkQueueInterest() {
701+
if !o.active || o.cfg.DeliverSubject == _EMPTY_ {
702+
return
703+
}
704+
subj := o.dsubj
705+
if subj == _EMPTY_ {
706+
subj = o.cfg.DeliverSubject
707+
}
708+
709+
if rr := o.acc.sl.Match(subj); len(rr.qsubs) > 0 {
710+
// Just grab first
711+
if qsubs := rr.qsubs[0]; len(qsubs) > 0 {
712+
if sub := rr.qsubs[0][0]; len(sub.queue) > 0 {
713+
o.qgroup = string(sub.queue)
714+
}
715+
}
716+
}
717+
}
718+
691719
// Lock should be held.
692720
func (o *consumer) isLeader() bool {
693721
if o.node != nil {
@@ -763,6 +791,8 @@ func (o *consumer) setLeader(isLeader bool) {
763791
stopAndClearTimer(&o.gwdtmr)
764792
o.gwdtmr = time.AfterFunc(time.Second, func() { o.watchGWinterest() })
765793
}
794+
} else {
795+
o.checkQueueInterest()
766796
}
767797
}
768798

@@ -968,7 +998,12 @@ func (o *consumer) updateDeliveryInterest(localInterest bool) bool {
968998
if interest && !o.active {
969999
o.signalNewMessages()
9701000
}
971-
o.active = interest
1001+
// Update active status, if not active clear any queue group we captured.
1002+
if o.active = interest; !o.active {
1003+
o.qgroup = _EMPTY_
1004+
} else {
1005+
o.checkQueueInterest()
1006+
}
9721007

9731008
// If the delete timer has already been set do not clear here and return.
9741009
if o.dtmr != nil && !o.isDurable() && !interest {
@@ -1433,13 +1468,21 @@ func (o *consumer) writeStoreState() error {
14331468

14341469
// Info returns our current consumer state.
14351470
func (o *consumer) info() *ConsumerInfo {
1471+
var pa *PushActive
1472+
14361473
o.mu.RLock()
14371474
mset := o.mset
14381475
if mset == nil || mset.srv == nil {
14391476
o.mu.RUnlock()
14401477
return nil
14411478
}
14421479
js := o.js
1480+
if o.active {
1481+
pa = &PushActive{
1482+
Subject: o.dsubj,
1483+
Queue: o.qgroup,
1484+
}
1485+
}
14431486
o.mu.RUnlock()
14441487

14451488
if js == nil {
@@ -1468,6 +1511,7 @@ func (o *consumer) info() *ConsumerInfo {
14681511
NumAckPending: len(o.pending),
14691512
NumRedelivered: len(o.rdc),
14701513
NumPending: o.adjustedPending(),
1514+
Active: pa,
14711515
Cluster: ci,
14721516
}
14731517
// If we are a pull mode consumer, report on number of waiting requests.

server/jetstream_test.go

+103-8
Original file line numberDiff line numberDiff line change
@@ -11801,10 +11801,6 @@ func TestJetStreamDomainInPubAck(t *testing.T) {
1180111801
s, _ := RunServerWithConfig(conf)
1180211802
defer s.Shutdown()
1180311803

11804-
if !s.JetStreamEnabled() {
11805-
t.Fatalf("Expected JetStream to be enabled")
11806-
}
11807-
1180811804
config := s.JetStreamConfig()
1180911805
if config != nil {
1181011806
defer removeDir(t, config.StoreDir)
@@ -11814,10 +11810,9 @@ func TestJetStreamDomainInPubAck(t *testing.T) {
1181411810
defer nc.Close()
1181511811

1181611812
cfg := &nats.StreamConfig{
11817-
Name: "TEST",
11818-
Storage: nats.MemoryStorage,
11819-
Subjects: []string{"foo"},
11820-
MaxConsumers: 1,
11813+
Name: "TEST",
11814+
Storage: nats.MemoryStorage,
11815+
Subjects: []string{"foo"},
1182111816
}
1182211817
if _, err := js.AddStream(cfg); err != nil {
1182311818
t.Fatalf("Unexpected error: %v", err)
@@ -11835,6 +11830,106 @@ func TestJetStreamDomainInPubAck(t *testing.T) {
1183511830
}
1183611831
}
1183711832

11833+
func TestJetStreamPushConsumerInfo(t *testing.T) {
11834+
s := RunBasicJetStreamServer()
11835+
defer s.Shutdown()
11836+
11837+
config := s.JetStreamConfig()
11838+
if config != nil {
11839+
defer removeDir(t, config.StoreDir)
11840+
}
11841+
11842+
nc, js := jsClientConnect(t, s)
11843+
defer nc.Close()
11844+
11845+
cfg := &nats.StreamConfig{
11846+
Name: "TEST",
11847+
Storage: nats.MemoryStorage,
11848+
Subjects: []string{"foo"},
11849+
}
11850+
if _, err := js.AddStream(cfg); err != nil {
11851+
t.Fatalf("Unexpected error: %v", err)
11852+
}
11853+
11854+
// We want to test extended consumer info for push based consumers.
11855+
// We need to do these by hand for now.
11856+
createConsumer := func(name, deliver string) {
11857+
t.Helper()
11858+
creq := CreateConsumerRequest{
11859+
Stream: "TEST",
11860+
Config: ConsumerConfig{
11861+
Durable: name,
11862+
DeliverSubject: deliver,
11863+
},
11864+
}
11865+
req, err := json.Marshal(creq)
11866+
if err != nil {
11867+
t.Fatalf("Unexpected error: %v", err)
11868+
}
11869+
resp, err := nc.Request(fmt.Sprintf(JSApiDurableCreateT, "TEST", name), req, time.Second)
11870+
if err != nil {
11871+
t.Fatalf("Unexpected error: %v", err)
11872+
}
11873+
var ccResp JSApiConsumerCreateResponse
11874+
if err := json.Unmarshal(resp.Data, &ccResp); err != nil {
11875+
t.Fatalf("Unexpected error: %v", err)
11876+
}
11877+
if ccResp.ConsumerInfo == nil || ccResp.Error != nil {
11878+
t.Fatalf("Got a bad response %+v", ccResp)
11879+
}
11880+
}
11881+
11882+
consumerInfo := func(name string) *ConsumerInfo {
11883+
t.Helper()
11884+
resp, err := nc.Request(fmt.Sprintf(JSApiConsumerInfoT, "TEST", name), nil, time.Second)
11885+
if err != nil {
11886+
t.Fatalf("Unexpected error: %v", err)
11887+
}
11888+
var cinfo JSApiConsumerInfoResponse
11889+
if err := json.Unmarshal(resp.Data, &cinfo); err != nil {
11890+
t.Fatalf("Unexpected error: %v", err)
11891+
}
11892+
if cinfo.ConsumerInfo == nil || cinfo.Error != nil {
11893+
t.Fatalf("Got a bad response %+v", cinfo)
11894+
}
11895+
return cinfo.ConsumerInfo
11896+
}
11897+
11898+
// First create a durable push and make sure we show now active status.
11899+
createConsumer("dlc", "d.X")
11900+
if ci := consumerInfo("dlc"); ci.Active != nil {
11901+
t.Fatalf("Expected active to be nil, got %+v\n", ci.Active)
11902+
}
11903+
// Now bind the deliver subject.
11904+
sub, _ := nc.SubscribeSync("d.X")
11905+
nc.Flush() // Make sure it registers.
11906+
// Check that its reported.
11907+
if ci := consumerInfo("dlc"); ci.Active == nil || ci.Active.Subject != "d.X" {
11908+
t.Fatalf("Expected active to be set and have subject %q, got %+v\n", "d.X", ci.Active)
11909+
}
11910+
sub.Unsubscribe()
11911+
nc.Flush() // Make sure it registers.
11912+
if ci := consumerInfo("dlc"); ci.Active != nil {
11913+
t.Fatalf("Expected active to be nil, got %+v\n", ci.Active)
11914+
}
11915+
11916+
// Now make sure we have queue groups indictated as needed.
11917+
createConsumer("ik", "d.Z")
11918+
// Now bind the deliver subject with a queue group.
11919+
sub, _ = nc.QueueSubscribeSync("d.Z", "g22")
11920+
defer sub.Unsubscribe()
11921+
nc.Flush() // Make sure it registers.
11922+
// Check that queue group reported.
11923+
if ci := consumerInfo("ik"); ci.Active == nil || ci.Active.Subject != "d.Z" || ci.Active.Queue != "g22" {
11924+
t.Fatalf("Expected active to be set and have subject %q and queue %q, got %+v\n", "d.Z", "g22", ci.Active)
11925+
}
11926+
sub.Unsubscribe()
11927+
nc.Flush() // Make sure it registers.
11928+
if ci := consumerInfo("ik"); ci.Active != nil {
11929+
t.Fatalf("Expected active to be nil, got %+v\n", ci.Active)
11930+
}
11931+
}
11932+
1183811933
// Issue #2213
1183911934
func TestJetStreamDirectConsumersBeingReported(t *testing.T) {
1184011935
s := RunBasicJetStreamServer()

0 commit comments

Comments
 (0)