Skip to content

Commit 1737b97

Browse files
authored
error-out if we can't Subscribe to membershipResolver (#6290)
Should never happen, but it returns error on duplicate subscription
1 parent e86b73e commit 1737b97

File tree

2 files changed

+50
-1
lines changed

2 files changed

+50
-1
lines changed

service/matching/handler/membership.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,10 @@ func (e *matchingEngineImpl) subscribeToMembershipChanges() {
5858
}
5959

6060
listener := make(chan *membership.ChangedEvent, subscriptionBufferSize)
61-
e.membershipResolver.Subscribe(service.Matching, "matching-engine", listener)
61+
if err := e.membershipResolver.Subscribe(service.Matching, "matching-engine", listener); err != nil {
62+
e.logger.Error("Failed to subscribe to membership updates")
63+
return
64+
}
6265

6366
for {
6467
select {

service/matching/handler/membership_test.go

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
package handler
2424

2525
import (
26+
"errors"
2627
"sync"
2728
"testing"
2829
"time"
@@ -32,12 +33,14 @@ import (
3233
"github.com/uber-go/tally"
3334

3435
"github.com/uber/cadence/client/history"
36+
"github.com/uber/cadence/common"
3537
"github.com/uber/cadence/common/cache"
3638
"github.com/uber/cadence/common/clock"
3739
"github.com/uber/cadence/common/cluster"
3840
"github.com/uber/cadence/common/dynamicconfig"
3941
cadence_errors "github.com/uber/cadence/common/errors"
4042
"github.com/uber/cadence/common/log/loggerimpl"
43+
"github.com/uber/cadence/common/log/testlogger"
4144
"github.com/uber/cadence/common/membership"
4245
"github.com/uber/cadence/common/metrics"
4346
"github.com/uber/cadence/common/persistence"
@@ -274,6 +277,49 @@ func TestSubscriptionAndErrorReturned(t *testing.T) {
274277
e.subscribeToMembershipChanges()
275278
}
276279

280+
func TestSubscribeToMembershipChangesQuitsIfSubscribeFails(t *testing.T) {
281+
ctrl := gomock.NewController(t)
282+
m := membership.NewMockResolver(ctrl)
283+
284+
logger, logs := testlogger.NewObserved(t)
285+
286+
shutdownWG := sync.WaitGroup{}
287+
shutdownWG.Add(1)
288+
289+
e := matchingEngineImpl{
290+
shutdownCompletion: &shutdownWG,
291+
membershipResolver: m,
292+
config: &config.Config{
293+
EnableTasklistOwnershipGuard: func(opts ...dynamicconfig.FilterOption) bool { return true },
294+
},
295+
shutdown: make(chan struct{}),
296+
logger: logger,
297+
}
298+
299+
// this should trigger the error case on a membership event
300+
m.EXPECT().WhoAmI().Return(membership.HostInfo{}, assert.AnError).AnyTimes()
301+
302+
m.EXPECT().Subscribe(service.Matching, "matching-engine", gomock.Any()).
303+
Return(errors.New("matching-engine is already subscribed to updates"))
304+
305+
go func() {
306+
// then call stop so the test can finish
307+
time.Sleep(time.Second)
308+
e.Stop()
309+
}()
310+
311+
e.subscribeToMembershipChanges()
312+
// check we emitted error-message
313+
filteredLogs := logs.FilterMessage("Failed to subscribe to membership updates")
314+
assert.Equal(t, 1, filteredLogs.Len(), "error-message should be produced")
315+
316+
assert.True(
317+
t,
318+
common.AwaitWaitGroup(&shutdownWG, 10*time.Second),
319+
"subscribeToMembershipChanges should immediately shut down because of critical error",
320+
)
321+
}
322+
277323
func TestGetTasklistManagerShutdownScenario(t *testing.T) {
278324
ctrl := gomock.NewController(t)
279325
m := membership.NewMockResolver(ctrl)

0 commit comments

Comments
 (0)