Skip to content

Commit 3cae2a1

Browse files
authored
feat(fxgcppubsub): Added nack reactor (#22)
* feat(fxgcppubsub): Fixed race conditions on avro binary codec * feat(fxgcppubsub): Fixed race conditions on avro binary codec * feat(fxgcppubsub): Added nack reactor
1 parent 75a8ca9 commit 3cae2a1

File tree

7 files changed

+204
-29
lines changed

7 files changed

+204
-29
lines changed

fxgcppubsub/module_test.go

Lines changed: 85 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -65,13 +65,8 @@ func TestFxGcpPubSubModule(t *testing.T) {
6565
fx.Populate(&publisher, &subscriber, &supervisor),
6666
).RequireStart().RequireStop()
6767

68-
t.Run("raw message", func(t *testing.T) {
69-
res, err := publisher.Publish(ctx, "raw-topic", []byte("test"))
70-
assert.NotNil(t, res)
71-
assert.NoError(t, err)
72-
73-
sid, err := res.Get(ctx)
74-
assert.NotEmpty(t, sid)
68+
t.Run("raw message ack", func(t *testing.T) {
69+
_, err := publisher.Publish(ctx, "raw-topic", []byte("test"))
7570
assert.NoError(t, err)
7671

7772
publisher.Stop()
@@ -89,17 +84,31 @@ func TestFxGcpPubSubModule(t *testing.T) {
8984
assert.NoError(t, err)
9085
})
9186

92-
t.Run("avro message", func(t *testing.T) {
93-
res, err := publisher.Publish(ctx, "avro-topic", &avro.SimpleRecord{
87+
t.Run("raw message nack", func(t *testing.T) {
88+
_, err := publisher.Publish(ctx, "raw-topic", []byte("test"))
89+
assert.NoError(t, err)
90+
91+
publisher.Stop()
92+
93+
waiter := supervisor.StartNackWaiter("raw-subscription")
94+
95+
//nolint:errcheck
96+
go subscriber.Subscribe(ctx, "raw-subscription", func(ctx context.Context, m *message.Message) {
97+
assert.Equal(t, []byte("test"), m.Data())
98+
99+
m.Nack()
100+
})
101+
102+
_, err = waiter.WaitMaxDuration(ctx, time.Second)
103+
assert.NoError(t, err)
104+
})
105+
106+
t.Run("avro message ack", func(t *testing.T) {
107+
_, err := publisher.Publish(ctx, "avro-topic", &avro.SimpleRecord{
94108
StringField: "test avro",
95109
FloatField: 12.34,
96110
BooleanField: true,
97111
})
98-
assert.NotNil(t, res)
99-
assert.NoError(t, err)
100-
101-
sid, err := res.Get(ctx)
102-
assert.NotEmpty(t, sid)
103112
assert.NoError(t, err)
104113

105114
publisher.Stop()
@@ -124,17 +133,42 @@ func TestFxGcpPubSubModule(t *testing.T) {
124133
assert.NoError(t, err)
125134
})
126135

127-
t.Run("proto message", func(t *testing.T) {
128-
res, err := publisher.Publish(ctx, "proto-topic", &proto.SimpleRecord{
136+
t.Run("avro message nack", func(t *testing.T) {
137+
_, err := publisher.Publish(ctx, "avro-topic", &avro.SimpleRecord{
138+
StringField: "test avro",
139+
FloatField: 12.34,
140+
BooleanField: true,
141+
})
142+
assert.NoError(t, err)
143+
144+
publisher.Stop()
145+
146+
waiter := supervisor.StartNackWaiter("avro-subscription")
147+
148+
//nolint:errcheck
149+
go subscriber.Subscribe(ctx, "avro-subscription", func(ctx context.Context, m *message.Message) {
150+
var out avro.SimpleRecord
151+
152+
err = m.Decode(&out)
153+
assert.NoError(t, err)
154+
155+
assert.Equal(t, "test avro", out.StringField)
156+
assert.Equal(t, float32(12.34), out.FloatField)
157+
assert.True(t, out.BooleanField)
158+
159+
m.Nack()
160+
})
161+
162+
_, err = waiter.WaitMaxDuration(ctx, time.Second)
163+
assert.NoError(t, err)
164+
})
165+
166+
t.Run("proto message ack", func(t *testing.T) {
167+
_, err := publisher.Publish(ctx, "proto-topic", &proto.SimpleRecord{
129168
StringField: "test proto",
130169
FloatField: 56.78,
131170
BooleanField: false,
132171
})
133-
assert.NotNil(t, res)
134-
assert.NoError(t, err)
135-
136-
sid, err := res.Get(ctx)
137-
assert.NotEmpty(t, sid)
138172
assert.NoError(t, err)
139173

140174
publisher.Stop()
@@ -158,4 +192,34 @@ func TestFxGcpPubSubModule(t *testing.T) {
158192
_, err = waiter.WaitMaxDuration(ctx, time.Second)
159193
assert.NoError(t, err)
160194
})
195+
196+
t.Run("proto message nack", func(t *testing.T) {
197+
_, err := publisher.Publish(ctx, "proto-topic", &proto.SimpleRecord{
198+
StringField: "test proto",
199+
FloatField: 56.78,
200+
BooleanField: false,
201+
})
202+
assert.NoError(t, err)
203+
204+
publisher.Stop()
205+
206+
waiter := supervisor.StartNackWaiter("proto-subscription")
207+
208+
//nolint:errcheck
209+
go subscriber.Subscribe(ctx, "proto-subscription", func(ctx context.Context, m *message.Message) {
210+
var out proto.SimpleRecord
211+
212+
err = m.Decode(&out)
213+
assert.NoError(t, err)
214+
215+
assert.Equal(t, "test proto", out.StringField)
216+
assert.Equal(t, float32(56.78), out.FloatField)
217+
assert.False(t, out.BooleanField)
218+
219+
m.Nack()
220+
})
221+
222+
_, err = waiter.WaitMaxDuration(ctx, time.Second)
223+
assert.NoError(t, err)
224+
})
161225
}

fxgcppubsub/reactor/ack/reactor.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ func NewAckReactor(supervisor AckSupervisor) *AckReactor {
2020
func (r *AckReactor) FuncNames() []string {
2121
return []string{
2222
"Acknowledge",
23+
"ModifyAckDeadline",
2324
}
2425
}
2526

@@ -29,5 +30,9 @@ func (r *AckReactor) React(req any) (bool, any, error) {
2930
r.supervisor.StopAckWaiter(ackReq.Subscription, ackReq.AckIds, nil)
3031
}
3132

33+
if ackReq, ok := req.(*pubsubpb.ModifyAckDeadlineRequest); ok {
34+
r.supervisor.StopNackWaiter(ackReq.Subscription, ackReq.AckIds, nil)
35+
}
36+
3237
return false, nil, nil
3338
}

fxgcppubsub/reactor/ack/reactor_test.go

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,10 +36,17 @@ func TestAckReactor(t *testing.T) {
3636
react := ack.NewAckReactor(sup)
3737

3838
t.Run("func names", func(t *testing.T) {
39-
assert.Equal(t, []string{"Acknowledge"}, react.FuncNames())
39+
assert.Equal(
40+
t,
41+
[]string{
42+
"Acknowledge",
43+
"ModifyAckDeadline",
44+
},
45+
react.FuncNames(),
46+
)
4047
})
4148

42-
t.Run("react", func(t *testing.T) {
49+
t.Run("react to ack", func(t *testing.T) {
4350
req := &pubsubpb.AcknowledgeRequest{
4451
Subscription: subscription.NormalizeSubscriptionName("test-project", "test-subscription"),
4552
AckIds: []string{"test-id"},
@@ -59,4 +66,25 @@ func TestAckReactor(t *testing.T) {
5966
assert.NoError(t, err)
6067
assert.Equal(t, []string{"test-id"}, data)
6168
})
69+
70+
t.Run("react to nack", func(t *testing.T) {
71+
req := &pubsubpb.ModifyAckDeadlineRequest{
72+
Subscription: subscription.NormalizeSubscriptionName("test-project", "test-subscription"),
73+
AckIds: []string{"test-id"},
74+
}
75+
76+
waiter := sup.StartNackWaiter("test-subscription")
77+
78+
go func() {
79+
rHandled, rRet, rErr := react.React(req)
80+
81+
assert.False(t, rHandled)
82+
assert.Nil(t, rRet)
83+
assert.NoError(t, rErr)
84+
}()
85+
86+
data, err := waiter.WaitMaxDuration(context.Background(), 1*time.Millisecond)
87+
assert.NoError(t, err)
88+
assert.Equal(t, []string{"test-id"}, data)
89+
})
6290
}

fxgcppubsub/reactor/ack/supervisor.go

Lines changed: 35 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,39 +1,71 @@
11
package ack
22

33
import (
4+
"fmt"
5+
46
"github.com/ankorstore/yokai-contrib/fxgcppubsub/reactor"
57
"github.com/ankorstore/yokai-contrib/fxgcppubsub/subscription"
68
"github.com/ankorstore/yokai/config"
79
)
810

11+
const (
12+
Ack = "ack"
13+
Nack = "nack"
14+
)
15+
916
var _ AckSupervisor = (*DefaultAckSupervisor)(nil)
1017

18+
// AckSupervisor is a reactor supervisor that reacts to acks ans nacks.
1119
type AckSupervisor interface {
1220
StartAckWaiter(subscriptionID string) *reactor.Waiter
1321
StopAckWaiter(subscriptionName string, ackIDs []string, err error)
22+
StartNackWaiter(subscriptionID string) *reactor.Waiter
23+
StopNackWaiter(subscriptionName string, ackIDs []string, err error)
1424
}
1525

26+
// DefaultAckSupervisor is the default AckSupervisor implementation.
1627
type DefaultAckSupervisor struct {
1728
supervisor reactor.WaiterSupervisor
1829
config *config.Config
1930
}
2031

32+
// NewDefaultAckSupervisor returns a new DefaultAckSupervisor instance.
2133
func NewDefaultAckSupervisor(supervisor reactor.WaiterSupervisor, config *config.Config) *DefaultAckSupervisor {
2234
return &DefaultAckSupervisor{
2335
supervisor: supervisor,
2436
config: config,
2537
}
2638
}
2739

40+
// StartAckWaiter starts an ack waiter on a provided subscriptionID.
2841
func (s *DefaultAckSupervisor) StartAckWaiter(subscriptionID string) *reactor.Waiter {
42+
return s.startWaiter(subscriptionID, Ack)
43+
}
44+
45+
// StopAckWaiter stop an ack waiter for a provided subscriptionName.
46+
func (s *DefaultAckSupervisor) StopAckWaiter(subscriptionName string, ackIDs []string, err error) {
47+
s.stopWaiter(subscriptionName, Ack, ackIDs, err)
48+
}
49+
50+
// StartNackWaiter starts a nack waiter on a provided subscriptionID.
51+
func (s *DefaultAckSupervisor) StartNackWaiter(subscriptionID string) *reactor.Waiter {
52+
return s.startWaiter(subscriptionID, Nack)
53+
}
54+
55+
// StopNackWaiter stop a nack waiter for a provided subscriptionName.
56+
func (s *DefaultAckSupervisor) StopNackWaiter(subscriptionName string, ackIDs []string, err error) {
57+
s.stopWaiter(subscriptionName, Nack, ackIDs, err)
58+
}
59+
60+
func (s *DefaultAckSupervisor) startWaiter(subscriptionID string, kind string) *reactor.Waiter {
2961
subscriptionName := subscription.NormalizeSubscriptionName(
3062
s.config.GetString("modules.gcppubsub.project.id"),
3163
subscriptionID,
3264
)
3365

34-
return s.supervisor.StartWaiter(subscriptionName)
66+
return s.supervisor.StartWaiter(fmt.Sprintf("%s::%s", kind, subscriptionName))
3567
}
3668

37-
func (s *DefaultAckSupervisor) StopAckWaiter(subscriptionName string, ackIDs []string, err error) {
38-
s.supervisor.StopWaiter(subscriptionName, ackIDs, err)
69+
func (s *DefaultAckSupervisor) stopWaiter(subscriptionName string, kind string, ackIDs []string, err error) {
70+
s.supervisor.StopWaiter(fmt.Sprintf("%s::%s", kind, subscriptionName), ackIDs, err)
3971
}

fxgcppubsub/reactor/ack/supervisor_test.go

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,4 +68,41 @@ func TestAckSupervisor(t *testing.T) {
6868
assert.Equal(t, assert.AnError, err)
6969
assert.Equal(t, []string{"test-id"}, data)
7070
})
71+
72+
t.Run("wait for nack", func(t *testing.T) {
73+
waiter := supervisor.StartNackWaiter("test-subscription")
74+
75+
go func() {
76+
time.Sleep(1 * time.Millisecond)
77+
78+
supervisor.StopNackWaiter(
79+
subscription.NormalizeSubscriptionName("test-project", "test-subscription"),
80+
[]string{"test-id"},
81+
nil,
82+
)
83+
}()
84+
85+
data, err := waiter.WaitMaxDuration(context.Background(), 5*time.Millisecond)
86+
assert.NoError(t, err)
87+
assert.Equal(t, []string{"test-id"}, data)
88+
})
89+
90+
t.Run("wait for nack with error", func(t *testing.T) {
91+
waiter := supervisor.StartNackWaiter("test-subscription")
92+
93+
go func() {
94+
time.Sleep(1 * time.Millisecond)
95+
96+
supervisor.StopNackWaiter(
97+
subscription.NormalizeSubscriptionName("test-project", "test-subscription"),
98+
[]string{"test-id"},
99+
assert.AnError,
100+
)
101+
}()
102+
103+
data, err := waiter.WaitMaxDuration(context.Background(), 5*time.Millisecond)
104+
assert.Error(t, err)
105+
assert.Equal(t, assert.AnError, err)
106+
assert.Equal(t, []string{"test-id"}, data)
107+
})
71108
}

fxgcppubsub/reactor/log/reactor.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package log
22

33
import (
4+
"fmt"
5+
46
"github.com/ankorstore/yokai/log"
57
)
68

@@ -49,7 +51,7 @@ func (r *LogReactor) FuncNames() []string {
4951

5052
// React is the reactor logic.
5153
func (r *LogReactor) React(req any) (bool, any, error) {
52-
r.logger.Debug().Interface("req", req).Msg("log reactor")
54+
r.logger.Debug().Str("type", fmt.Sprintf("%T", req)).Interface("data", req).Msg("log reactor")
5355

5456
return false, nil, nil
5557
}

fxgcppubsub/reactor/log/reactor_test.go

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package log_test
33
import (
44
"testing"
55

6+
"cloud.google.com/go/pubsub/apiv1/pubsubpb"
67
"github.com/ankorstore/yokai-contrib/fxgcppubsub/reactor/log"
78
yokailog "github.com/ankorstore/yokai/log"
89
"github.com/ankorstore/yokai/log/logtest"
@@ -61,15 +62,21 @@ func TestLogReactor(t *testing.T) {
6162
t.Run("react", func(t *testing.T) {
6263
t.Parallel()
6364

64-
rHandled, rRet, rErr := react.React("test")
65+
req := &pubsubpb.AcknowledgeRequest{
66+
Subscription: "test-subscription",
67+
AckIds: []string{"test-id"},
68+
}
69+
70+
rHandled, rRet, rErr := react.React(req)
6571

6672
assert.False(t, rHandled)
6773
assert.Nil(t, rRet)
6874
assert.NoError(t, rErr)
6975

7076
logtest.AssertHasLogRecord(t, logBuffer, map[string]interface{}{
7177
"level": "debug",
72-
"req": "test",
78+
"type": "*pubsubpb.AcknowledgeRequest",
79+
"data": "map[ack_ids:[test-id] subscription:test-subscription]",
7380
"message": "log reactor",
7481
})
7582
})

0 commit comments

Comments
 (0)