@@ -25,18 +25,12 @@ type RedisTransport struct {
25
25
client * redis.Client
26
26
subscribers * SubscriberList
27
27
dispatcherPoolSize int
28
- dispatcher chan SubscriberPayload
29
28
closed chan any
30
29
publishScript * redis.Script
31
30
closedOnce sync.Once
32
31
redisChannel string
33
32
}
34
33
35
- type SubscriberPayload struct {
36
- subscriber * LocalSubscriber
37
- payload Update
38
- }
39
-
40
34
func NewRedisTransport (
41
35
logger Logger ,
42
36
address string ,
@@ -76,7 +70,6 @@ func NewRedisTransportInstance(
76
70
subscribers : NewSubscriberList (subscribersSize ),
77
71
dispatcherPoolSize : dispatcherPoolSize ,
78
72
publishScript : redis .NewScript (publishScript ),
79
- dispatcher : make (chan SubscriberPayload ),
80
73
closed : make (chan any ),
81
74
redisChannel : redisChannel ,
82
75
}
@@ -103,19 +96,6 @@ func NewRedisTransportInstance(
103
96
transport .subscribe (subscribeCtx , subscribeCancel , subscriber )
104
97
}()
105
98
106
- wg .Add (dispatcherPoolSize )
107
- for range dispatcherPoolSize {
108
- go func () {
109
- defer wg .Done ()
110
- transport .dispatch ()
111
- }()
112
- }
113
-
114
- go func () {
115
- wg .Wait ()
116
- close (transport .dispatcher )
117
- }()
118
-
119
99
return transport , nil
120
100
}
121
101
@@ -232,24 +212,12 @@ func (t *RedisTransport) subscribe(ctx context.Context, cancel context.CancelFun
232
212
t .Lock ()
233
213
for _ , subscriber := range t .subscribers .MatchAny (& update ) {
234
214
update .Topics = topics
235
- t . dispatcher <- SubscriberPayload { subscriber , update }
215
+ subscriber . Dispatch ( & update , false )
236
216
}
237
217
t .Unlock ()
238
218
}
239
219
}
240
220
241
- func (t * RedisTransport ) dispatch () {
242
- for {
243
- select {
244
- case message := <- t .dispatcher :
245
- message .subscriber .Dispatch (& message .payload , false )
246
- case <- t .closed :
247
-
248
- return
249
- }
250
- }
251
- }
252
-
253
221
var (
254
222
_ Transport = (* RedisTransport )(nil )
255
223
_ TransportSubscribers = (* RedisTransport )(nil )
0 commit comments