Skip to content

Commit 33dd132

Browse files
authored
fix(fxgcppubsub): Fixed nack reactor (#24)
* fix(fxgcppubsub): Fixed nack reactor * fix(fxgcppubsub): Fixed nack reactor * fix(fxgcppubsub): Fixed nack reactor * fix(fxgcppubsub): Fixed nack reactor * fix(fxgcppubsub): Fixed nack reactor
1 parent 4d680e5 commit 33dd132

File tree

3 files changed

+6
-4
lines changed

3 files changed

+6
-4
lines changed

fxgcppubsub/reactor/ack/reactor.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,8 @@ func (r *AckReactor) React(req any) (bool, any, error) {
3030
r.supervisor.StopAckWaiter(ackReq.Subscription, ackReq.AckIds, nil)
3131
}
3232

33-
if ackReq, ok := req.(*pubsubpb.ModifyAckDeadlineRequest); ok {
34-
r.supervisor.StopNackWaiter(ackReq.Subscription, ackReq.AckIds, nil)
33+
if modReq, ok := req.(*pubsubpb.ModifyAckDeadlineRequest); ok {
34+
r.supervisor.StopNackWaiter(modReq.Subscription, modReq.AckIds, nil)
3535
}
3636

3737
return false, nil, nil

fxgcppubsub/reactor/ack/reactor_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ func TestAckReactor(t *testing.T) {
6262
assert.NoError(t, rErr)
6363
}()
6464

65-
data, err := waiter.WaitMaxDuration(context.Background(), 1*time.Millisecond)
65+
data, err := waiter.WaitMaxDuration(context.Background(), 5*time.Millisecond)
6666
assert.NoError(t, err)
6767
assert.Equal(t, []string{"test-id"}, data)
6868
})
@@ -83,7 +83,7 @@ func TestAckReactor(t *testing.T) {
8383
assert.NoError(t, rErr)
8484
}()
8585

86-
data, err := waiter.WaitMaxDuration(context.Background(), 1*time.Millisecond)
86+
data, err := waiter.WaitMaxDuration(context.Background(), 5*time.Millisecond)
8787
assert.NoError(t, err)
8888
assert.Equal(t, []string{"test-id"}, data)
8989
})

fxgcppubsub/reactor/supervisor.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,5 +44,7 @@ func (s *DefaultWaiterSupervisor) StopWaiter(target string, data any, err error)
4444

4545
if waiter, found := s.waiters[target]; found {
4646
waiter.Stop(data, err)
47+
48+
delete(s.waiters, target)
4749
}
4850
}

0 commit comments

Comments
 (0)