Skip to content

Commit 18dec15

Browse files
[FIXED] Filestore PurgeEx by sequence with interior delete gap (#6861)
Filestore would incorrectly set `mb.first.seq` after a `PurgeEx` using a sequence within an interior delete gap. It would set the first sequence to whatever is provided as the purge sequence, instead of properly moving the first sequence up and taking deletes into account. Signed-off-by: Maurice van Veen <[email protected]>
2 parents 04c089c + cc67f2f commit 18dec15

File tree

2 files changed

+94
-2
lines changed

2 files changed

+94
-2
lines changed

server/filestore.go

+7-2
Original file line numberDiff line numberDiff line change
@@ -8332,8 +8332,13 @@ func (fs *fileStore) compact(seq uint64) (uint64, error) {
83328332
} else {
83338333
// Make sure to sync changes.
83348334
smb.needSync = true
8335-
// Update fs first seq and time.
8336-
atomic.StoreUint64(&smb.first.seq, seq-1) // Just for start condition for selectNextFirst.
8335+
// Just for start condition for selectNextFirst.
8336+
if smb.first.seq < seq {
8337+
atomic.StoreUint64(&smb.first.seq, seq-1)
8338+
} else {
8339+
// selectNextFirst always adds 1, so need to subtract 1 here.
8340+
atomic.StoreUint64(&smb.first.seq, smb.first.seq-1)
8341+
}
83378342
smb.selectNextFirst()
83388343

83398344
fs.state.FirstSeq = atomic.LoadUint64(&smb.first.seq)

server/jetstream_test.go

+87
Original file line numberDiff line numberDiff line change
@@ -19831,3 +19831,90 @@ func TestJetStreamDirectGetSubjectDeleteMarker(t *testing.T) {
1983119831
})
1983219832
}
1983319833
}
19834+
19835+
func TestJetStreamPurgeExSeqSimple(t *testing.T) {
19836+
for _, storageType := range []nats.StorageType{nats.FileStorage, nats.MemoryStorage} {
19837+
t.Run(storageType.String(), func(t *testing.T) {
19838+
s := RunBasicJetStreamServer(t)
19839+
defer s.Shutdown()
19840+
19841+
nc, js := jsClientConnect(t, s)
19842+
defer nc.Close()
19843+
19844+
_, err := js.AddStream(&nats.StreamConfig{
19845+
Name: "TEST",
19846+
Subjects: []string{"test"},
19847+
Storage: storageType,
19848+
})
19849+
require_NoError(t, err)
19850+
19851+
data := make([]byte, 1024)
19852+
for i := 0; i < 10_000; i++ {
19853+
_, err = js.Publish("test", data)
19854+
require_NoError(t, err)
19855+
}
19856+
19857+
si, err := js.StreamInfo("TEST")
19858+
require_NoError(t, err)
19859+
require_Equal(t, si.State.Msgs, 10_000)
19860+
19861+
require_NoError(t, js.PurgeStream("TEST", &nats.StreamPurgeRequest{Sequence: 9_000}))
19862+
19863+
si, err = js.StreamInfo("TEST")
19864+
require_NoError(t, err)
19865+
require_Equal(t, si.State.Msgs, 1_001)
19866+
require_Equal(t, si.State.NumDeleted, 0)
19867+
require_Equal(t, si.State.FirstSeq, 9_000)
19868+
require_Equal(t, si.State.LastSeq, 10_000)
19869+
})
19870+
}
19871+
}
19872+
19873+
func TestJetStreamPurgeExSeqInInteriorDeleteGap(t *testing.T) {
19874+
for _, storageType := range []nats.StorageType{nats.FileStorage, nats.MemoryStorage} {
19875+
t.Run(storageType.String(), func(t *testing.T) {
19876+
s := RunBasicJetStreamServer(t)
19877+
defer s.Shutdown()
19878+
19879+
nc, js := jsClientConnect(t, s)
19880+
defer nc.Close()
19881+
19882+
_, err := js.AddStream(&nats.StreamConfig{
19883+
Name: "TEST",
19884+
Subjects: []string{"test.*"},
19885+
Storage: storageType,
19886+
})
19887+
require_NoError(t, err)
19888+
19889+
data := make([]byte, 1024)
19890+
_, err = js.Publish("test.start", data)
19891+
require_NoError(t, err)
19892+
for i := 0; i < 10_000; i++ {
19893+
_, err = js.Publish("test.mid", data)
19894+
require_NoError(t, err)
19895+
}
19896+
_, err = js.Publish("test.end", data)
19897+
require_NoError(t, err)
19898+
19899+
si, err := js.StreamInfo("TEST")
19900+
require_NoError(t, err)
19901+
require_Equal(t, si.State.Msgs, 10_002)
19902+
19903+
require_NoError(t, js.PurgeStream("TEST", &nats.StreamPurgeRequest{Subject: "test.mid"}))
19904+
19905+
si, err = js.StreamInfo("TEST")
19906+
require_NoError(t, err)
19907+
require_Equal(t, si.State.Msgs, 2)
19908+
require_Equal(t, si.State.NumDeleted, 10_000)
19909+
19910+
require_NoError(t, js.PurgeStream("TEST", &nats.StreamPurgeRequest{Sequence: 9_000}))
19911+
19912+
si, err = js.StreamInfo("TEST")
19913+
require_NoError(t, err)
19914+
require_Equal(t, si.State.Msgs, 1)
19915+
require_Equal(t, si.State.NumDeleted, 0)
19916+
require_Equal(t, si.State.FirstSeq, 10_002)
19917+
require_Equal(t, si.State.LastSeq, 10_002)
19918+
})
19919+
}
19920+
}

0 commit comments

Comments
 (0)