Skip to content

Commit c449a96

Browse files
[FIXED] Consumer reset on stream snapshot failure (#6796)
When a stream follower requires a catchup from a leader but is unable to do so, `mset.resetClusteredState` is called. However, it would also result in the replicated consumer state to be deleted. Which isn't needed, as the node can just be restarted. Signed-off-by: Maurice van Veen <[email protected]>
2 parents 608a33a + 82cc5ec commit c449a96

File tree

2 files changed

+94
-1
lines changed

2 files changed

+94
-1
lines changed

server/jetstream_cluster_4_test.go

+93
Original file line numberDiff line numberDiff line change
@@ -3439,6 +3439,99 @@ func TestJetStreamClusterDesyncAfterErrorDuringCatchup(t *testing.T) {
34393439
}
34403440
}
34413441

3442+
func TestJetStreamClusterConsumerDesyncAfterErrorDuringStreamCatchup(t *testing.T) {
3443+
c := createJetStreamClusterExplicit(t, "R3S", 3)
3444+
defer c.shutdown()
3445+
3446+
nc, js := jsClientConnect(t, c.randomServer())
3447+
defer nc.Close()
3448+
3449+
_, err := js.AddStream(&nats.StreamConfig{
3450+
Name: "TEST",
3451+
Subjects: []string{"foo"},
3452+
Replicas: 3,
3453+
})
3454+
require_NoError(t, err)
3455+
3456+
ci, err := js.AddConsumer("TEST", &nats.ConsumerConfig{
3457+
Durable: "CONSUMER",
3458+
AckPolicy: nats.AckExplicitPolicy,
3459+
})
3460+
require_NoError(t, err)
3461+
3462+
consumerLeader := ci.Cluster.Leader
3463+
consumerLeaderServer := c.serverByName(consumerLeader)
3464+
nc.Close()
3465+
nc, js = jsClientConnect(t, consumerLeaderServer)
3466+
defer nc.Close()
3467+
3468+
servers := slices.DeleteFunc([]string{"S-1", "S-2", "S-3"}, func(s string) bool {
3469+
return s == consumerLeader
3470+
})
3471+
3472+
// Publish 1 message, consume, and ack it.
3473+
pubAck, err := js.Publish("foo", []byte("ok"))
3474+
require_NoError(t, err)
3475+
require_Equal(t, pubAck.Sequence, 1)
3476+
checkFor(t, time.Second, 100*time.Millisecond, func() error {
3477+
return checkState(t, c, globalAccountName, "TEST")
3478+
})
3479+
3480+
sub, err := js.PullSubscribe("foo", "CONSUMER")
3481+
require_NoError(t, err)
3482+
defer sub.Drain()
3483+
3484+
msgs, err := sub.Fetch(1)
3485+
require_NoError(t, err)
3486+
require_Len(t, len(msgs), 1)
3487+
require_NoError(t, msgs[0].AckSync())
3488+
3489+
outdatedServerName := servers[0]
3490+
clusterResetServerName := servers[1]
3491+
3492+
outdatedServer := c.serverByName(outdatedServerName)
3493+
outdatedServer.Shutdown()
3494+
outdatedServer.WaitForShutdown()
3495+
3496+
// Publish and ack another message, one server will be behind.
3497+
pubAck, err = js.Publish("foo", []byte("ok"))
3498+
require_NoError(t, err)
3499+
require_Equal(t, pubAck.Sequence, 2)
3500+
checkFor(t, time.Second, 100*time.Millisecond, func() error {
3501+
return checkState(t, c, globalAccountName, "TEST")
3502+
})
3503+
3504+
msgs, err = sub.Fetch(1)
3505+
require_NoError(t, err)
3506+
require_Len(t, len(msgs), 1)
3507+
require_NoError(t, msgs[0].AckSync())
3508+
3509+
// We will not need the client anymore.
3510+
nc.Close()
3511+
3512+
// Shutdown consumer leader so one server remains.
3513+
consumerLeaderServer.Shutdown()
3514+
consumerLeaderServer.WaitForShutdown()
3515+
3516+
clusterResetServer := c.serverByName(clusterResetServerName)
3517+
acc, err := clusterResetServer.lookupAccount(globalAccountName)
3518+
require_NoError(t, err)
3519+
mset, err := acc.lookupStream("TEST")
3520+
require_NoError(t, err)
3521+
3522+
// Run error condition.
3523+
mset.resetClusteredState(nil)
3524+
3525+
// Consumer leader stays offline, we only start the server with missing stream/consumer data.
3526+
// We expect that the reset server must not allow the outdated server to become leader, as that would result in desync.
3527+
c.restartServer(outdatedServer)
3528+
c.waitOnConsumerLeader(globalAccountName, "TEST", "CONSUMER")
3529+
3530+
// Outdated server must NOT become the leader.
3531+
newConsummerLeaderServer := c.consumerLeader(globalAccountName, "TEST", "CONSUMER")
3532+
require_Equal(t, newConsummerLeaderServer.Name(), clusterResetServerName)
3533+
}
3534+
34423535
func TestJetStreamClusterReservedResourcesAccountingAfterClusterReset(t *testing.T) {
34433536
for _, clusterResetErr := range []error{errLastSeqMismatch, errFirstSequenceMismatch} {
34443537
t.Run(clusterResetErr.Error(), func(t *testing.T) {

server/stream.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -5665,7 +5665,7 @@ func (mset *stream) resetAndWaitOnConsumers() {
56655665
for _, o := range consumers {
56665666
if node := o.raftNode(); node != nil {
56675667
node.StepDown()
5668-
node.Delete()
5668+
node.Stop()
56695669
}
56705670
if o.isMonitorRunning() {
56715671
o.monitorWg.Wait()

0 commit comments

Comments
 (0)