Skip to content

Commit 04c089c

Browse files
[FIXED] Deny stream/consumer updates if all peers offline (#6856)
A stream/consumer update would be allowed even if all peers are offline. For example: - create R1 stream/consumer - shutdown the server hosting it - update to R3 That would result in data loss, because only the offline server will have the data. So deny the update request until there's at least one online peer. Signed-off-by: Maurice van Veen <[email protected]>
2 parents b60cf79 + 80b99af commit 04c089c

File tree

2 files changed

+85
-0
lines changed

2 files changed

+85
-0
lines changed

server/jetstream_cluster.go

+13
Original file line numberDiff line numberDiff line change
@@ -6289,6 +6289,13 @@ func (s *Server) jsClusteredStreamUpdateRequest(ci *ClientInfo, acc *Account, su
62896289
return
62906290
}
62916291

6292+
// Don't allow updating if all peers are offline.
6293+
if s.allPeersOffline(osa.Group) {
6294+
resp.Error = NewJSStreamOfflineError()
6295+
s.sendDelayedAPIErrResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(&resp), nil, errRespDelay)
6296+
return
6297+
}
6298+
62926299
// Update asset version metadata.
62936300
setStaticStreamMetadata(cfg)
62946301

@@ -7413,6 +7420,12 @@ func (s *Server) jsClusteredConsumerRequest(ci *ClientInfo, acc *Account, subjec
74137420
s.sendAPIErrResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(&resp))
74147421
return
74157422
}
7423+
// Don't allow updating if all peers are offline.
7424+
if s.allPeersOffline(ca.Group) {
7425+
resp.Error = NewJSConsumerOfflineError()
7426+
s.sendDelayedAPIErrResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(&resp), nil, errRespDelay)
7427+
return
7428+
}
74167429
} else {
74177430
// Initialize/update asset version metadata.
74187431
// First time creating this consumer, or updating.

server/jetstream_cluster_1_test.go

+72
Original file line numberDiff line numberDiff line change
@@ -8392,6 +8392,78 @@ func TestJetStreamClusterConsumerActiveAfterDidNotDeliverOverRoute(t *testing.T)
83928392
require_True(t, ci.PushBound)
83938393
}
83948394

8395+
func TestJetStreamClusterOfflineR1StreamDenyUpdate(t *testing.T) {
8396+
c := createJetStreamClusterExplicit(t, "R3S", 3)
8397+
defer c.shutdown()
8398+
8399+
nc, js := jsClientConnect(t, c.randomServer())
8400+
defer nc.Close()
8401+
8402+
cfg := &nats.StreamConfig{
8403+
Name: "TEST",
8404+
Subjects: []string{"foo"},
8405+
Replicas: 1,
8406+
}
8407+
_, err := js.AddStream(cfg)
8408+
require_NoError(t, err)
8409+
8410+
// Stop current R1 stream leader.
8411+
sl := c.streamLeader(globalAccountName, "TEST")
8412+
sl.Shutdown()
8413+
nc.Close()
8414+
nc, js = jsClientConnect(t, c.randomServer())
8415+
defer nc.Close()
8416+
8417+
// Wait for meta leader, so we can send an update.
8418+
c.waitOnLeader()
8419+
8420+
_, err = js.StreamInfo("TEST")
8421+
require_Error(t, err, NewJSStreamOfflineError())
8422+
8423+
cfg.Replicas = 3
8424+
_, err = js.UpdateStream(cfg)
8425+
require_Error(t, err, NewJSStreamOfflineError())
8426+
}
8427+
8428+
func TestJetStreamClusterOfflineR1ConsumerDenyUpdate(t *testing.T) {
8429+
c := createJetStreamClusterExplicit(t, "R3S", 3)
8430+
defer c.shutdown()
8431+
8432+
nc, js := jsClientConnect(t, c.randomServer())
8433+
defer nc.Close()
8434+
8435+
_, err := js.AddStream(&nats.StreamConfig{
8436+
Name: "TEST",
8437+
Subjects: []string{"foo"},
8438+
Replicas: 3,
8439+
})
8440+
require_NoError(t, err)
8441+
8442+
cfg := &nats.ConsumerConfig{
8443+
Durable: "CONSUMER",
8444+
Replicas: 1,
8445+
}
8446+
_, err = js.AddConsumer("TEST", cfg)
8447+
require_NoError(t, err)
8448+
8449+
// Stop current R1 consumer leader.
8450+
cl := c.consumerLeader(globalAccountName, "TEST", "CONSUMER")
8451+
cl.Shutdown()
8452+
nc.Close()
8453+
nc, js = jsClientConnect(t, c.randomServer())
8454+
defer nc.Close()
8455+
8456+
// Wait for meta leader, so we can send an update.
8457+
c.waitOnLeader()
8458+
8459+
_, err = js.ConsumerInfo("TEST", "CONSUMER")
8460+
require_Error(t, err, NewJSConsumerOfflineError())
8461+
8462+
cfg.Replicas = 3
8463+
_, err = js.UpdateConsumer("TEST", cfg)
8464+
require_Error(t, err, NewJSConsumerOfflineError())
8465+
}
8466+
83958467
//
83968468
// DO NOT ADD NEW TESTS IN THIS FILE (unless to balance test times)
83978469
// Add at the end of jetstream_cluster_<n>_test.go, with <n> being the highest value.

0 commit comments

Comments
 (0)