Skip to content

Commit 6602c3d

Browse files
[FIXED] AckAll on replicated Interest stream
Signed-off-by: Maurice van Veen <[email protected]>
1 parent e50fb31 commit 6602c3d

File tree

2 files changed

+74
-15
lines changed

2 files changed

+74
-15
lines changed

server/jetstream_cluster.go

+10-15
Original file line numberDiff line numberDiff line change
@@ -5097,6 +5097,16 @@ func (o *consumer) processReplicatedAck(dseq, sseq uint64) error {
50975097
// Update activity.
50985098
o.lat = time.Now()
50995099

5100+
var sagap uint64
5101+
if o.cfg.AckPolicy == AckAll {
5102+
// Always use the store state, as o.asflr is skipped ahead already.
5103+
// Capture before updating store.
5104+
state, err := o.store.State()
5105+
if err == nil {
5106+
sagap = sseq - state.AckFloor.Stream
5107+
}
5108+
}
5109+
51005110
// Do actual ack update to store.
51015111
// Always do this to have it recorded.
51025112
o.store.UpdateAcks(dseq, sseq)
@@ -5121,21 +5131,6 @@ func (o *consumer) processReplicatedAck(dseq, sseq uint64) error {
51215131
o.mu.Unlock()
51225132
return nil
51235133
}
5124-
5125-
var sagap uint64
5126-
if o.cfg.AckPolicy == AckAll {
5127-
if o.isLeader() {
5128-
sagap = sseq - o.asflr
5129-
} else {
5130-
// We are a follower so only have the store state, so read that in.
5131-
state, err := o.store.State()
5132-
if err != nil {
5133-
o.mu.Unlock()
5134-
return err
5135-
}
5136-
sagap = sseq - state.AckFloor.Stream
5137-
}
5138-
}
51395134
o.mu.Unlock()
51405135

51415136
if sagap > 1 {

server/jetstream_cluster_1_test.go

+64
Original file line numberDiff line numberDiff line change
@@ -8004,6 +8004,70 @@ func TestJetStreamClusterUpgradeConsumerVersioning(t *testing.T) {
80048004
}
80058005
}
80068006

8007+
func TestJetStreamClusterInterestPolicyAckAll(t *testing.T) {
8008+
c := createJetStreamClusterExplicit(t, "R3S", 3)
8009+
defer c.shutdown()
8010+
8011+
nc, js := jsClientConnect(t, c.randomServer())
8012+
defer nc.Close()
8013+
8014+
_, err := js.AddStream(&nats.StreamConfig{
8015+
Name: "TEST",
8016+
Retention: nats.InterestPolicy,
8017+
Subjects: []string{"foo"},
8018+
Replicas: 3,
8019+
})
8020+
require_NoError(t, err)
8021+
8022+
_, err = js.AddConsumer("TEST", &nats.ConsumerConfig{
8023+
Durable: "CONSUMER",
8024+
AckPolicy: nats.AckAllPolicy,
8025+
})
8026+
require_NoError(t, err)
8027+
8028+
for i := 0; i < 100; i++ {
8029+
_, err = js.Publish("foo", []byte("ok"))
8030+
require_NoError(t, err)
8031+
}
8032+
8033+
expectedStreamMsgs := func(msgs uint64) {
8034+
t.Helper()
8035+
checkFor(t, 2*time.Second, 200*time.Millisecond, func() error {
8036+
si, err := js.StreamInfo("TEST")
8037+
if err != nil {
8038+
return err
8039+
}
8040+
if si.State.Msgs != msgs {
8041+
return fmt.Errorf("require uint64 equal, but got: %d != %d", si.State.Msgs, msgs)
8042+
}
8043+
return nil
8044+
})
8045+
}
8046+
expectedStreamMsgs(100)
8047+
8048+
for _, s := range c.servers {
8049+
acc, err := s.lookupAccount(globalAccountName)
8050+
require_NoError(t, err)
8051+
mset, err := acc.lookupStream("TEST")
8052+
require_NoError(t, err)
8053+
o := mset.lookupConsumer("CONSUMER")
8054+
require_NotNil(t, o)
8055+
o.mu.Lock()
8056+
// Ensure o.checkStateForInterestStream can't hide that the issue happened.
8057+
o.chkflr = 1000
8058+
o.mu.Unlock()
8059+
}
8060+
8061+
sub, err := js.PullSubscribe("foo", "CONSUMER")
8062+
require_NoError(t, err)
8063+
msgs, err := sub.Fetch(50)
8064+
require_NoError(t, err)
8065+
require_True(t, len(msgs) == 50)
8066+
require_NoError(t, msgs[49].AckSync())
8067+
8068+
expectedStreamMsgs(50)
8069+
}
8070+
80078071
//
80088072
// DO NOT ADD NEW TESTS IN THIS FILE (unless to balance test times)
80098073
// Add at the end of jetstream_cluster_<n>_test.go, with <n> being the highest value.

0 commit comments

Comments
 (0)