Skip to content

Commit 976a3ec

Browse files
authored
Fix goroutines
* Fix goroutines count because of history dispatch
1 parent 3096b60 commit 976a3ec

File tree

8 files changed

+29
-84
lines changed

8 files changed

+29
-84
lines changed

.vscode/launch.json

-1
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@
3333
"MERCURE_EXTRA_DIRECTIVES": "anonymous\nwrite_timeout 10s",
3434
"MERCURE_REDIS_ADDRESS": "localhost:6379",
3535
"MERCURE_REDIS_SUBSCRIBERS_SIZE": "10000",
36-
"MERCURE_REDIS_DISPATCHER_POOL_SIZE": "16",
3736
"MERCURE_REDIS_CHANNEL": "mercure",
3837
"GLOBAL_OPTIONS": "debug",
3938
"SERVER_NAME": ":1234",

caddy/redis.go

+6-19
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,11 @@ func init() {
1515
}
1616

1717
type Redis struct {
18-
Address string `json:"address,omitempty"`
19-
Username string `json:"username,omitempty"`
20-
Password string `json:"password,omitempty"`
21-
SubscribersSize int `json:"subscribers_size,omitempty"`
22-
DispatcherPoolSize int `json:"dispatcher_pool_size,omitempty"`
23-
RedisChannel string `json:"redis_channel,omitempty"`
18+
Address string `json:"address,omitempty"`
19+
Username string `json:"username,omitempty"`
20+
Password string `json:"password,omitempty"`
21+
SubscribersSize int `json:"subscribers_size,omitempty"`
22+
RedisChannel string `json:"redis_channel,omitempty"`
2423

2524
transport *mercure.RedisTransport
2625
transportKey string
@@ -47,7 +46,7 @@ func (r *Redis) Provision(ctx caddy.Context) error {
4746
r.transportKey = key.String()
4847

4948
destructor, _, err := TransportUsagePool.LoadOrNew(r.transportKey, func() (caddy.Destructor, error) {
50-
t, err := mercure.NewRedisTransport(ctx.Logger(), r.Address, r.Username, r.Password, r.SubscribersSize, r.DispatcherPoolSize, r.RedisChannel)
49+
t, err := mercure.NewRedisTransport(ctx.Logger(), r.Address, r.Username, r.Password, r.SubscribersSize, r.RedisChannel)
5150
if err != nil {
5251
return nil, err
5352
}
@@ -108,18 +107,6 @@ func (r *Redis) UnmarshalCaddyfile(d *caddyfile.Dispenser) error {
108107

109108
r.SubscribersSize = s
110109

111-
case "dispatcher_pool_size":
112-
if !d.NextArg() {
113-
return d.ArgErr()
114-
}
115-
116-
s, e := strconv.Atoi(replacer.ReplaceKnown(d.Val(), ""))
117-
if e != nil {
118-
return e
119-
}
120-
121-
r.DispatcherPoolSize = s
122-
123110
case "redis_channel":
124111
if !d.NextArg() {
125112
return d.ArgErr()

charts/mercure/templates/deployment.yaml

-4
Original file line numberDiff line numberDiff line change
@@ -56,10 +56,6 @@ spec:
5656
- name: MERCURE_REDIS_SUBSCRIBERS_SIZE
5757
value: "{{ .Values.redis.subscribers_size }}"
5858
{{- end }}
59-
{{- if .Values.redis.dispatcher_pool_size }}
60-
- name: MERCURE_REDIS_DISPATCHER_POOL_SIZE
61-
value: "{{ .Values.redis.dispatcher_pool_size }}"
62-
{{- end }}
6359
{{- if .Values.redis.redis_channel }}
6460
- name: MERCURE_REDIS_CHANNEL
6561
value: "{{ .Values.redis.redis_channel }}"

charts/mercure/values.yaml

-1
Original file line numberDiff line numberDiff line change
@@ -223,7 +223,6 @@ redis:
223223
password: ~
224224
dispatch_timer: ~
225225
subscribers_size: ~
226-
dispatcher_pool_size: ~
227226
redis_channel: mercure
228227

229228
customLabels: ~

redis.Caddyfile

-1
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@
3131
username {env.MERCURE_REDIS_USERNAME}
3232
password {env.MERCURE_REDIS_PASSWORD}
3333
subscribers_size {env.MERCURE_REDIS_SUBSCRIBERS_SIZE}
34-
dispatcher_pool_size {env.MERCURE_REDIS_DISPATCHER_POOL_SIZE}
3534
redis_channel {env.MERCURE_REDIS_CHANNEL}
3635
}
3736
}

redis.dev.Caddyfile

-1
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,6 @@
3737
username {env.MERCURE_REDIS_USERNAME}
3838
password {env.MERCURE_REDIS_PASSWORD}
3939
subscribers_size {env.MERCURE_REDIS_SUBSCRIBERS_SIZE}
40-
dispatcher_pool_size {env.MERCURE_REDIS_DISPATCHER_POOL_SIZE}
4140
redis_channel {env.MERCURE_REDIS_CHANNEL}
4241
}
4342
}

redis.go

+19-52
Original file line numberDiff line numberDiff line change
@@ -21,20 +21,13 @@ const (
2121

2222
type RedisTransport struct {
2323
sync.RWMutex
24-
logger Logger
25-
client *redis.Client
26-
subscribers *SubscriberList
27-
dispatcherPoolSize int
28-
dispatcher chan SubscriberPayload
29-
closed chan any
30-
publishScript *redis.Script
31-
closedOnce sync.Once
32-
redisChannel string
33-
}
34-
35-
type SubscriberPayload struct {
36-
subscriber *LocalSubscriber
37-
payload Update
24+
logger Logger
25+
client *redis.Client
26+
subscribers *SubscriberList
27+
closed chan any
28+
publishScript *redis.Script
29+
closedOnce sync.Once
30+
redisChannel string
3831
}
3932

4033
func NewRedisTransport(
@@ -43,7 +36,6 @@ func NewRedisTransport(
4336
username string,
4437
password string,
4538
subscribersSize int,
46-
dispatcherPoolSize int,
4739
redisChannel string,
4840
) (*RedisTransport, error) {
4941
client := redis.NewClient(&redis.Options{
@@ -56,29 +48,26 @@ func NewRedisTransport(
5648
return nil, fmt.Errorf("failed to connect to Redis: %w", pong.Err())
5749
}
5850

59-
return NewRedisTransportInstance(logger, client, subscribersSize, dispatcherPoolSize, redisChannel)
51+
return NewRedisTransportInstance(logger, client, subscribersSize, redisChannel)
6052
}
6153

6254
func NewRedisTransportInstance(
6355
logger Logger,
6456
client *redis.Client,
6557
subscribersSize int,
66-
dispatcherPoolSize int,
6758
redisChannel string,
6859
) (*RedisTransport, error) {
6960
subscriber := client.PSubscribe(context.Background(), redisChannel)
7061

7162
subscribeCtx, subscribeCancel := context.WithCancel(context.Background())
7263

7364
transport := &RedisTransport{
74-
logger: logger,
75-
client: client,
76-
subscribers: NewSubscriberList(subscribersSize),
77-
dispatcherPoolSize: dispatcherPoolSize,
78-
publishScript: redis.NewScript(publishScript),
79-
dispatcher: make(chan SubscriberPayload),
80-
closed: make(chan any),
81-
redisChannel: redisChannel,
65+
logger: logger,
66+
client: client,
67+
subscribers: NewSubscriberList(subscribersSize),
68+
publishScript: redis.NewScript(publishScript),
69+
closed: make(chan any),
70+
redisChannel: redisChannel,
8271
}
8372

8473
go func() {
@@ -103,19 +92,6 @@ func NewRedisTransportInstance(
10392
transport.subscribe(subscribeCtx, subscribeCancel, subscriber)
10493
}()
10594

106-
wg.Add(dispatcherPoolSize)
107-
for range dispatcherPoolSize {
108-
go func() {
109-
defer wg.Done()
110-
transport.dispatch()
111-
}()
112-
}
113-
114-
go func() {
115-
wg.Wait()
116-
close(transport.dispatcher)
117-
}()
118-
11995
return transport, nil
12096
}
12197

@@ -154,10 +130,13 @@ func (t *RedisTransport) AddSubscriber(s *LocalSubscriber) error {
154130
return ErrClosedTransport
155131
default:
156132
}
133+
157134
t.Lock()
158135
t.subscribers.Add(s)
159136
t.Unlock()
160-
137+
if s.RequestLastEventID != "" {
138+
s.HistoryDispatched(EarliestLastEventID)
139+
}
161140
s.Ready()
162141

163142
return nil
@@ -232,24 +211,12 @@ func (t *RedisTransport) subscribe(ctx context.Context, cancel context.CancelFun
232211
t.Lock()
233212
for _, subscriber := range t.subscribers.MatchAny(&update) {
234213
update.Topics = topics
235-
t.dispatcher <- SubscriberPayload{subscriber, update}
214+
subscriber.Dispatch(&update, false)
236215
}
237216
t.Unlock()
238217
}
239218
}
240219

241-
func (t *RedisTransport) dispatch() {
242-
for {
243-
select {
244-
case message := <-t.dispatcher:
245-
message.subscriber.Dispatch(&message.payload, false)
246-
case <-t.closed:
247-
248-
return
249-
}
250-
}
251-
}
252-
253220
var (
254221
_ Transport = (*RedisTransport)(nil)
255222
_ TransportSubscribers = (*RedisTransport)(nil)

redis_test.go

+4-5
Original file line numberDiff line numberDiff line change
@@ -11,14 +11,13 @@ import (
1111
)
1212

1313
const (
14-
redisHost = "localhost:6379"
15-
redisSubscriberSize = 100000
16-
redisDispatchPoolSize = 16
17-
redisChannel = "channel"
14+
redisHost = "localhost:6379"
15+
redisSubscriberSize = 100000
16+
redisChannel = "channel"
1817
)
1918

2019
func initialize() *RedisTransport {
21-
transport, _ := NewRedisTransport(zap.NewNop(), redisHost, "", "", redisSubscriberSize, redisDispatchPoolSize, redisChannel)
20+
transport, _ := NewRedisTransport(zap.NewNop(), redisHost, "", "", redisSubscriberSize, redisChannel)
2221

2322
return transport
2423
}

0 commit comments

Comments
 (0)