Skip to content

Commit f441c7b

Browse files
Merge pull request #2354 from nats-io/maxcc
Fix for multiple concurrent ephemeral consumer requests with max consumers set.
2 parents 2534434 + 065049e commit f441c7b

File tree

2 files changed

+60
-8
lines changed

2 files changed

+60
-8
lines changed

server/jetstream_cluster.go

+5-7
Original file line numberDiff line numberDiff line change
@@ -4050,14 +4050,12 @@ func (s *Server) jsClusteredConsumerRequest(ci *ClientInfo, acc *Account, subjec
40504050
ca := &consumerAssignment{Group: rg, Stream: stream, Name: oname, Config: cfg, Subject: subject, Reply: reply, Client: ci, Created: time.Now().UTC()}
40514051
eca := encodeAddConsumerAssignment(ca)
40524052

4053-
// Mark this as pending if a durable.
4054-
if isDurableConsumer(cfg) {
4055-
if sa.consumers == nil {
4056-
sa.consumers = make(map[string]*consumerAssignment)
4057-
}
4058-
ca.pending = true
4059-
sa.consumers[ca.Name] = ca
4053+
// Mark this as pending.
4054+
if sa.consumers == nil {
4055+
sa.consumers = make(map[string]*consumerAssignment)
40604056
}
4057+
ca.pending = true
4058+
sa.consumers[ca.Name] = ca
40614059

40624060
// Do formal proposal.
40634061
cc.meta.Propose(eca)

server/jetstream_cluster_test.go

+55-1
Original file line numberDiff line numberDiff line change
@@ -7340,7 +7340,7 @@ func TestJetStreamClusterSourceAndMirrorConsumersLeaderChange(t *testing.T) {
73407340

73417341
// Now make sure we only have a single direct consumer on our origin streams.
73427342
// Pick one at random.
7343-
name := fmt.Sprintf("O%d", rand.Intn(numStreams))
7343+
name := fmt.Sprintf("O%d", rand.Intn(numStreams-1)+1)
73447344
c1.waitOnStreamLeader("$G", name)
73457345
s := c1.streamLeader("$G", name)
73467346
a, err := s.lookupAccount("$G")
@@ -7351,6 +7351,7 @@ func TestJetStreamClusterSourceAndMirrorConsumersLeaderChange(t *testing.T) {
73517351
if err != nil {
73527352
t.Fatalf("Unexpected error: %v", err)
73537353
}
7354+
73547355
checkFor(t, 10*time.Second, 250*time.Millisecond, func() error {
73557356
if ndc := mset.numDirectConsumers(); ndc != 1 {
73567357
return fmt.Errorf("Stream %q wanted 1 direct consumer, got %d", name, ndc)
@@ -7640,6 +7641,59 @@ func TestJetStreamClusterMaxConsumers(t *testing.T) {
76407641
}
76417642
}
76427643

7644+
func TestJetStreamClusterMaxConsumersMultipleConcurrentRequests(t *testing.T) {
7645+
c := createJetStreamClusterExplicit(t, "JSC", 3)
7646+
defer c.shutdown()
7647+
7648+
nc, js := jsClientConnect(t, c.randomServer())
7649+
defer nc.Close()
7650+
7651+
cfg := &nats.StreamConfig{
7652+
Name: "MAXCC",
7653+
Storage: nats.MemoryStorage,
7654+
Subjects: []string{"in.maxcc.>"},
7655+
MaxConsumers: 1,
7656+
Replicas: 3,
7657+
}
7658+
if _, err := js.AddStream(cfg); err != nil {
7659+
t.Fatalf("Unexpected error: %v", err)
7660+
}
7661+
si, err := js.StreamInfo("MAXCC")
7662+
if err != nil {
7663+
t.Fatalf("Unexpected error: %v", err)
7664+
}
7665+
if si.Config.MaxConsumers != 1 {
7666+
t.Fatalf("Expected max of 1, got %d", si.Config.MaxConsumers)
7667+
}
7668+
7669+
startCh := make(chan bool)
7670+
var wg sync.WaitGroup
7671+
7672+
for n := 0; n < 10; n++ {
7673+
wg.Add(1)
7674+
go func() {
7675+
defer wg.Done()
7676+
nc, js := jsClientConnect(t, c.randomServer())
7677+
defer nc.Close()
7678+
<-startCh
7679+
js.SubscribeSync("in.maxcc.foo")
7680+
}()
7681+
}
7682+
// Wait for Go routines.
7683+
time.Sleep(250 * time.Millisecond)
7684+
7685+
close(startCh)
7686+
wg.Wait()
7687+
7688+
var names []string
7689+
for n := range js.ConsumerNames("MAXCC") {
7690+
names = append(names, n)
7691+
}
7692+
if nc := len(names); nc > 1 {
7693+
t.Fatalf("Expected only 1 consumer, got %d", nc)
7694+
}
7695+
}
7696+
76437697
// Support functions
76447698

76457699
// Used to setup superclusters for tests.

0 commit comments

Comments
 (0)