Skip to content

Commit 877ccae

Browse files
authored
Create interface and mock for matcher (#6374)
* Create interface and mock for matcher
1 parent 1ba0d71 commit 877ccae

File tree

6 files changed

+197
-33
lines changed

6 files changed

+197
-33
lines changed

service/matching/tasklist/interfaces.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
// SOFTWARE.
2222

2323
//go:generate mockgen -package $GOPACKAGE -source $GOFILE -destination interfaces_mock.go github.com/uber/cadence/service/matching/tasklist Manager
24+
//go:generate mockgen -package $GOPACKAGE -source $GOFILE -destination interfaces_mock.go github.com/uber/cadence/service/matching/tasklist TaskMatcher
2425
package tasklist
2526

2627
import (
@@ -57,4 +58,16 @@ type (
5758
GetTaskListKind() types.TaskListKind
5859
TaskListID() *Identifier
5960
}
61+
62+
TaskMatcher interface {
63+
DisconnectBlockedPollers()
64+
Offer(ctx context.Context, task *InternalTask) (bool, error)
65+
OfferOrTimeout(ctx context.Context, startT time.Time, task *InternalTask) (bool, error)
66+
OfferQuery(ctx context.Context, task *InternalTask) (*types.QueryWorkflowResponse, error)
67+
MustOffer(ctx context.Context, task *InternalTask) error
68+
Poll(ctx context.Context, isolationGroup string) (*InternalTask, error)
69+
PollForQuery(ctx context.Context) (*InternalTask, error)
70+
UpdateRatelimit(rps *float64)
71+
Rate() float64
72+
}
6073
)

service/matching/tasklist/interfaces_mock.go

Lines changed: 150 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

service/matching/tasklist/matcher.go

Lines changed: 24 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -37,10 +37,10 @@ import (
3737
"github.com/uber/cadence/service/matching/event"
3838
)
3939

40-
// TaskMatcher matches a task producer with a task consumer
40+
// taskMatcherImpl matches a task producer with a task consumer
4141
// Producers are usually rpc calls from history or taskReader
4242
// that drains backlog from db. Consumers are the task list pollers
43-
type TaskMatcher struct {
43+
type taskMatcherImpl struct {
4444
log log.Logger
4545
// synchronous task channel to match producer/consumer for any isolation group
4646
// tasks having no isolation requirement are added to this channel
@@ -81,7 +81,7 @@ func newTaskMatcher(
8181
isolationGroups []string,
8282
log log.Logger,
8383
tasklist *Identifier,
84-
tasklistKind types.TaskListKind) *TaskMatcher {
84+
tasklistKind types.TaskListKind) TaskMatcher {
8585
dPtr := config.TaskDispatchRPS
8686
limiter := quotas.NewRateLimiter(&dPtr, config.TaskDispatchRPSTTL, config.MinTaskThrottlingBurstSize())
8787
isolatedTaskC := make(map[string]chan *InternalTask)
@@ -91,7 +91,7 @@ func newTaskMatcher(
9191

9292
cancelCtx, cancelFunc := context.WithCancel(context.Background())
9393

94-
return &TaskMatcher{
94+
return &taskMatcherImpl{
9595
log: log,
9696
limiter: limiter,
9797
scope: scope,
@@ -108,7 +108,7 @@ func newTaskMatcher(
108108
}
109109

110110
// DisconnectBlockedPollers gradually disconnects pollers which are blocked on long polling
111-
func (tm *TaskMatcher) DisconnectBlockedPollers() {
111+
func (tm *taskMatcherImpl) DisconnectBlockedPollers() {
112112
tm.cancelFunc()
113113
}
114114

@@ -141,7 +141,7 @@ func (tm *TaskMatcher) DisconnectBlockedPollers() {
141141
// - ratelimit is exceeded (does not apply to query task)
142142
// - context deadline is exceeded
143143
// - task is matched and consumer returns error in response channel
144-
func (tm *TaskMatcher) Offer(ctx context.Context, task *InternalTask) (bool, error) {
144+
func (tm *taskMatcherImpl) Offer(ctx context.Context, task *InternalTask) (bool, error) {
145145
startT := time.Now()
146146
if !task.IsForwarded() {
147147
err := tm.ratelimit(ctx)
@@ -213,15 +213,16 @@ func (tm *TaskMatcher) Offer(ctx context.Context, task *InternalTask) (bool, err
213213
task.IsForwarded() { // task came from a child partition
214214
// a forwarded backlog task from a child partition, block trying
215215
// to match with a poller until ctx timeout
216-
return tm.offerOrTimeout(ctx, startT, task)
216+
return tm.OfferOrTimeout(ctx, startT, task)
217217
}
218218
}
219219

220220
return false, nil
221221
}
222222
}
223223

224-
func (tm *TaskMatcher) offerOrTimeout(ctx context.Context, startT time.Time, task *InternalTask) (bool, error) {
224+
// OfferOrTimeout offers a task to a poller and blocks until a poller picks up the task or context timeouts
225+
func (tm *taskMatcherImpl) OfferOrTimeout(ctx context.Context, startT time.Time, task *InternalTask) (bool, error) {
225226
select {
226227
case tm.getTaskC(task) <- task: // poller picked up the task
227228
if task.ResponseC != nil {
@@ -242,7 +243,7 @@ func (tm *TaskMatcher) offerOrTimeout(ctx context.Context, startT time.Time, tas
242243
// OfferQuery will either match task to local poller or will forward query task.
243244
// Local match is always attempted before forwarding is attempted. If local match occurs
244245
// response and error are both nil, if forwarding occurs then response or error is returned.
245-
func (tm *TaskMatcher) OfferQuery(ctx context.Context, task *InternalTask) (*types.QueryWorkflowResponse, error) {
246+
func (tm *taskMatcherImpl) OfferQuery(ctx context.Context, task *InternalTask) (*types.QueryWorkflowResponse, error) {
246247
select {
247248
case tm.queryTaskC <- task:
248249
<-task.ResponseC
@@ -278,7 +279,7 @@ func (tm *TaskMatcher) OfferQuery(ctx context.Context, task *InternalTask) (*typ
278279

279280
// MustOffer blocks until a consumer is found to handle this task
280281
// Returns error only when context is canceled, expired or the ratelimit is set to zero (allow nothing)
281-
func (tm *TaskMatcher) MustOffer(ctx context.Context, task *InternalTask) error {
282+
func (tm *taskMatcherImpl) MustOffer(ctx context.Context, task *InternalTask) error {
282283
e := event.E{
283284
TaskListName: tm.tasklist.GetName(),
284285
TaskListType: tm.tasklist.GetType(),
@@ -390,7 +391,7 @@ forLoop:
390391
// On success, the returned task could be a query task or a regular task
391392
// Returns ErrNoTasks when context deadline is exceeded
392393
// Returns ErrMatcherClosed when matching is closed
393-
func (tm *TaskMatcher) Poll(ctx context.Context, isolationGroup string) (*InternalTask, error) {
394+
func (tm *taskMatcherImpl) Poll(ctx context.Context, isolationGroup string) (*InternalTask, error) {
394395
startT := time.Now()
395396
isolatedTaskC, ok := tm.isolatedTaskC[isolationGroup]
396397
if !ok && isolationGroup != "" {
@@ -429,7 +430,7 @@ func (tm *TaskMatcher) Poll(ctx context.Context, isolationGroup string) (*Intern
429430
// PollForQuery blocks until a *query* task is found or context deadline is exceeded
430431
// Returns ErrNoTasks when context deadline is exceeded
431432
// Returns ErrMatcherClosed when matching is closed
432-
func (tm *TaskMatcher) PollForQuery(ctx context.Context) (*InternalTask, error) {
433+
func (tm *taskMatcherImpl) PollForQuery(ctx context.Context) (*InternalTask, error) {
433434
startT := time.Now()
434435
// try local match first without blocking until context timeout
435436
if task, err := tm.pollNonBlocking(ctx, nil, nil, tm.queryTaskC); err == nil {
@@ -443,7 +444,7 @@ func (tm *TaskMatcher) PollForQuery(ctx context.Context) (*InternalTask, error)
443444
}
444445

445446
// UpdateRatelimit updates the task dispatch rate
446-
func (tm *TaskMatcher) UpdateRatelimit(rps *float64) {
447+
func (tm *taskMatcherImpl) UpdateRatelimit(rps *float64) {
447448
if rps == nil {
448449
return
449450
}
@@ -457,11 +458,11 @@ func (tm *TaskMatcher) UpdateRatelimit(rps *float64) {
457458
}
458459

459460
// Rate returns the current rate at which tasks are dispatched
460-
func (tm *TaskMatcher) Rate() float64 {
461+
func (tm *taskMatcherImpl) Rate() float64 {
461462
return float64(tm.limiter.Limit())
462463
}
463464

464-
func (tm *TaskMatcher) pollOrForward(
465+
func (tm *taskMatcherImpl) pollOrForward(
465466
ctx context.Context,
466467
startT time.Time,
467468
isolationGroup string,
@@ -546,7 +547,7 @@ func (tm *TaskMatcher) pollOrForward(
546547
}
547548
}
548549

549-
func (tm *TaskMatcher) poll(
550+
func (tm *taskMatcherImpl) poll(
550551
ctx context.Context,
551552
startT time.Time,
552553
isolatedTaskC <-chan *InternalTask,
@@ -610,7 +611,7 @@ func (tm *TaskMatcher) poll(
610611
}
611612
}
612613

613-
func (tm *TaskMatcher) pollLocalWait(
614+
func (tm *taskMatcherImpl) pollLocalWait(
614615
ctx context.Context,
615616
isolatedTaskC <-chan *InternalTask,
616617
taskC <-chan *InternalTask,
@@ -670,7 +671,7 @@ func (tm *TaskMatcher) pollLocalWait(
670671
}
671672
}
672673

673-
func (tm *TaskMatcher) pollNonBlocking(
674+
func (tm *taskMatcherImpl) pollNonBlocking(
674675
ctx context.Context,
675676
isolatedTaskC <-chan *InternalTask,
676677
taskC <-chan *InternalTask,
@@ -736,21 +737,21 @@ func (tm *TaskMatcher) pollNonBlocking(
736737
}
737738
}
738739

739-
func (tm *TaskMatcher) fwdrPollReqTokenC(isolationGroup string) <-chan *ForwarderReqToken {
740+
func (tm *taskMatcherImpl) fwdrPollReqTokenC(isolationGroup string) <-chan *ForwarderReqToken {
740741
if tm.fwdr == nil {
741742
return noopForwarderTokenC
742743
}
743744
return tm.fwdr.PollReqTokenC(isolationGroup)
744745
}
745746

746-
func (tm *TaskMatcher) fwdrAddReqTokenC() <-chan *ForwarderReqToken {
747+
func (tm *taskMatcherImpl) fwdrAddReqTokenC() <-chan *ForwarderReqToken {
747748
if tm.fwdr == nil {
748749
return noopForwarderTokenC
749750
}
750751
return tm.fwdr.AddReqTokenC()
751752
}
752753

753-
func (tm *TaskMatcher) ratelimit(ctx context.Context) error {
754+
func (tm *taskMatcherImpl) ratelimit(ctx context.Context) error {
754755
err := tm.limiter.Wait(ctx)
755756
if errors.Is(err, clock.ErrCannotWait) {
756757
// "err != ctx.Err()" may also be correct, as that would mean "gave up due to context".
@@ -763,11 +764,11 @@ func (tm *TaskMatcher) ratelimit(ctx context.Context) error {
763764
return err // nil if success, non-nil if canceled
764765
}
765766

766-
func (tm *TaskMatcher) isForwardingAllowed() bool {
767+
func (tm *taskMatcherImpl) isForwardingAllowed() bool {
767768
return tm.fwdr != nil
768769
}
769770

770-
func (tm *TaskMatcher) getTaskC(task *InternalTask) chan<- *InternalTask {
771+
func (tm *taskMatcherImpl) getTaskC(task *InternalTask) chan<- *InternalTask {
771772
taskC := tm.taskC
772773
if isolatedTaskC, ok := tm.isolatedTaskC[task.isolationGroup]; ok && task.isolationGroup != "" {
773774
taskC = isolatedTaskC

0 commit comments

Comments
 (0)