Skip to content

Commit cc67f2f

Browse files
[FIXED] Filestore PurgeEx by sequence with interior delete gap
Signed-off-by: Maurice van Veen <[email protected]>
1 parent b60cf79 commit cc67f2f

File tree

2 files changed

+94
-2
lines changed

2 files changed

+94
-2
lines changed

server/filestore.go

+7-2
Original file line numberDiff line numberDiff line change
@@ -8332,8 +8332,13 @@ func (fs *fileStore) compact(seq uint64) (uint64, error) {
83328332
} else {
83338333
// Make sure to sync changes.
83348334
smb.needSync = true
8335-
// Update fs first seq and time.
8336-
atomic.StoreUint64(&smb.first.seq, seq-1) // Just for start condition for selectNextFirst.
8335+
// Just for start condition for selectNextFirst.
8336+
if smb.first.seq < seq {
8337+
atomic.StoreUint64(&smb.first.seq, seq-1)
8338+
} else {
8339+
// selectNextFirst always adds 1, so need to subtract 1 here.
8340+
atomic.StoreUint64(&smb.first.seq, smb.first.seq-1)
8341+
}
83378342
smb.selectNextFirst()
83388343

83398344
fs.state.FirstSeq = atomic.LoadUint64(&smb.first.seq)

server/jetstream_test.go

+87
Original file line numberDiff line numberDiff line change
@@ -19831,3 +19831,90 @@ func TestJetStreamDirectGetSubjectDeleteMarker(t *testing.T) {
1983119831
})
1983219832
}
1983319833
}
19834+
19835+
func TestJetStreamPurgeExSeqSimple(t *testing.T) {
19836+
for _, storageType := range []nats.StorageType{nats.FileStorage, nats.MemoryStorage} {
19837+
t.Run(storageType.String(), func(t *testing.T) {
19838+
s := RunBasicJetStreamServer(t)
19839+
defer s.Shutdown()
19840+
19841+
nc, js := jsClientConnect(t, s)
19842+
defer nc.Close()
19843+
19844+
_, err := js.AddStream(&nats.StreamConfig{
19845+
Name: "TEST",
19846+
Subjects: []string{"test"},
19847+
Storage: storageType,
19848+
})
19849+
require_NoError(t, err)
19850+
19851+
data := make([]byte, 1024)
19852+
for i := 0; i < 10_000; i++ {
19853+
_, err = js.Publish("test", data)
19854+
require_NoError(t, err)
19855+
}
19856+
19857+
si, err := js.StreamInfo("TEST")
19858+
require_NoError(t, err)
19859+
require_Equal(t, si.State.Msgs, 10_000)
19860+
19861+
require_NoError(t, js.PurgeStream("TEST", &nats.StreamPurgeRequest{Sequence: 9_000}))
19862+
19863+
si, err = js.StreamInfo("TEST")
19864+
require_NoError(t, err)
19865+
require_Equal(t, si.State.Msgs, 1_001)
19866+
require_Equal(t, si.State.NumDeleted, 0)
19867+
require_Equal(t, si.State.FirstSeq, 9_000)
19868+
require_Equal(t, si.State.LastSeq, 10_000)
19869+
})
19870+
}
19871+
}
19872+
19873+
func TestJetStreamPurgeExSeqInInteriorDeleteGap(t *testing.T) {
19874+
for _, storageType := range []nats.StorageType{nats.FileStorage, nats.MemoryStorage} {
19875+
t.Run(storageType.String(), func(t *testing.T) {
19876+
s := RunBasicJetStreamServer(t)
19877+
defer s.Shutdown()
19878+
19879+
nc, js := jsClientConnect(t, s)
19880+
defer nc.Close()
19881+
19882+
_, err := js.AddStream(&nats.StreamConfig{
19883+
Name: "TEST",
19884+
Subjects: []string{"test.*"},
19885+
Storage: storageType,
19886+
})
19887+
require_NoError(t, err)
19888+
19889+
data := make([]byte, 1024)
19890+
_, err = js.Publish("test.start", data)
19891+
require_NoError(t, err)
19892+
for i := 0; i < 10_000; i++ {
19893+
_, err = js.Publish("test.mid", data)
19894+
require_NoError(t, err)
19895+
}
19896+
_, err = js.Publish("test.end", data)
19897+
require_NoError(t, err)
19898+
19899+
si, err := js.StreamInfo("TEST")
19900+
require_NoError(t, err)
19901+
require_Equal(t, si.State.Msgs, 10_002)
19902+
19903+
require_NoError(t, js.PurgeStream("TEST", &nats.StreamPurgeRequest{Subject: "test.mid"}))
19904+
19905+
si, err = js.StreamInfo("TEST")
19906+
require_NoError(t, err)
19907+
require_Equal(t, si.State.Msgs, 2)
19908+
require_Equal(t, si.State.NumDeleted, 10_000)
19909+
19910+
require_NoError(t, js.PurgeStream("TEST", &nats.StreamPurgeRequest{Sequence: 9_000}))
19911+
19912+
si, err = js.StreamInfo("TEST")
19913+
require_NoError(t, err)
19914+
require_Equal(t, si.State.Msgs, 1)
19915+
require_Equal(t, si.State.NumDeleted, 0)
19916+
require_Equal(t, si.State.FirstSeq, 10_002)
19917+
require_Equal(t, si.State.LastSeq, 10_002)
19918+
})
19919+
}
19920+
}

0 commit comments

Comments
 (0)