Skip to content

Commit 719bb1c

Browse files
author
Tamer Eldeeb
authored
Pass config to cadence matching and frontend services during initialization (#296)
Replace constant config with a config struct that can be passed to the service during initialization. Issue #286
1 parent 76ddae6 commit 719bb1c

File tree

10 files changed

+142
-100
lines changed

10 files changed

+142
-100
lines changed

cmd/server/server.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,14 +21,15 @@
2121
package main
2222

2323
import (
24+
"log"
25+
"time"
26+
2427
"github.com/uber/cadence/common"
2528
"github.com/uber/cadence/common/service"
2629
"github.com/uber/cadence/common/service/config"
2730
"github.com/uber/cadence/service/frontend"
2831
"github.com/uber/cadence/service/history"
2932
"github.com/uber/cadence/service/matching"
30-
"log"
31-
"time"
3233
)
3334

3435
type (
@@ -107,11 +108,11 @@ func (s *server) startService() common.Daemon {
107108

108109
switch s.name {
109110
case frontendService:
110-
daemon = frontend.NewService(&params)
111+
daemon = frontend.NewService(&params, frontend.NewConfig())
111112
case historyService:
112113
daemon = history.NewService(&params)
113114
case matchingService:
114-
daemon = matching.NewService(&params)
115+
daemon = matching.NewService(&params, matching.NewConfig())
115116
}
116117

117118
go execute(daemon, s.doneC)

host/onebox.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,7 @@ func (c *cadenceImpl) startFrontend(logger bark.Logger, rpHosts []string, startW
155155
params.CassandraConfig.Hosts = "127.0.0.1"
156156
service := service.New(params)
157157
var thriftServices []thrift.TChanServer
158-
c.frontendHandler, thriftServices = frontend.NewWorkflowHandler(service, c.metadataMgr, c.historyMgr, c.visibilityMgr)
158+
c.frontendHandler, thriftServices = frontend.NewWorkflowHandler(service, frontend.NewConfig(), c.metadataMgr, c.historyMgr, c.visibilityMgr)
159159
err := c.frontendHandler.Start(thriftServices)
160160
if err != nil {
161161
c.logger.WithField("error", err).Fatal("Failed to start frontend")
@@ -203,7 +203,7 @@ func (c *cadenceImpl) startMatching(logger bark.Logger, taskMgr persistence.Task
203203
params.CassandraConfig.NumHistoryShards = c.numberOfHistoryShards
204204
service := service.New(params)
205205
var thriftServices []thrift.TChanServer
206-
c.matchingHandler, thriftServices = matching.NewHandler(taskMgr, service)
206+
c.matchingHandler, thriftServices = matching.NewHandler(service, matching.NewConfig(), taskMgr)
207207
c.matchingHandler.Start(thriftServices)
208208
startWG.Done()
209209
<-c.shutdownCh

service/frontend/handler.go

Lines changed: 8 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ type (
5959
metricsClient metrics.Client
6060
startWG sync.WaitGroup
6161
rateLimiter common.TokenBucket
62+
config *Config
6263
service.Service
6364
}
6465

@@ -69,12 +70,6 @@ type (
6970
}
7071
)
7172

72-
const (
73-
defaultVisibilityMaxPageSize = 1000
74-
defaultHistoryMaxPageSize = 1000
75-
defaultRPS = 1200 // This limit is based on experimental runs.
76-
)
77-
7873
var (
7974
errDomainNotSet = &gen.BadRequestError{Message: "Domain not set on request."}
8075
errTaskTokenNotSet = &gen.BadRequestError{Message: "Task token not set on request."}
@@ -89,17 +84,18 @@ var (
8984

9085
// NewWorkflowHandler creates a thrift handler for the cadence service
9186
func NewWorkflowHandler(
92-
sVice service.Service, metadataMgr persistence.MetadataManager,
87+
sVice service.Service, config *Config, metadataMgr persistence.MetadataManager,
9388
historyMgr persistence.HistoryManager, visibilityMgr persistence.VisibilityManager) (*WorkflowHandler, []thrift.TChanServer) {
9489
handler := &WorkflowHandler{
9590
Service: sVice,
91+
config: config,
9692
metadataMgr: metadataMgr,
9793
historyMgr: historyMgr,
9894
visibitiltyMgr: visibilityMgr,
9995
tokenSerializer: common.NewJSONTaskTokenSerializer(),
10096
hSerializerFactory: persistence.NewHistorySerializerFactory(),
10197
domainCache: cache.NewDomainCache(metadataMgr, sVice.GetLogger()),
102-
rateLimiter: common.NewTokenBucket(defaultRPS, common.NewRealTimeSource()),
98+
rateLimiter: common.NewTokenBucket(config.RPS, common.NewRealTimeSource()),
10399
}
104100
// prevent us from trying to serve requests before handler's Start() is complete
105101
handler.startWG.Add(1)
@@ -380,7 +376,7 @@ func (wh *WorkflowHandler) PollForDecisionTask(
380376
if matchingResp.IsSetWorkflowExecution() {
381377
// Non-empty response. Get the history
382378
history, persistenceToken, err = wh.getHistory(
383-
info.ID, *matchingResp.GetWorkflowExecution(), matchingResp.GetStartedEventId()+1, defaultHistoryMaxPageSize, nil)
379+
info.ID, *matchingResp.GetWorkflowExecution(), matchingResp.GetStartedEventId()+1, wh.config.DefaultHistoryMaxPageSize, nil)
384380
if err != nil {
385381
return nil, wh.error(err, scope)
386382
}
@@ -667,7 +663,7 @@ func (wh *WorkflowHandler) GetWorkflowExecutionHistory(
667663
}
668664

669665
if !getRequest.IsSetMaximumPageSize() || getRequest.GetMaximumPageSize() == 0 {
670-
getRequest.MaximumPageSize = common.Int32Ptr(defaultHistoryMaxPageSize)
666+
getRequest.MaximumPageSize = common.Int32Ptr(wh.config.DefaultHistoryMaxPageSize)
671667
}
672668

673669
domainName := getRequest.GetDomain()
@@ -911,7 +907,7 @@ func (wh *WorkflowHandler) ListOpenWorkflowExecutions(ctx thrift.Context,
911907
}
912908

913909
if !listRequest.IsSetMaximumPageSize() || listRequest.GetMaximumPageSize() == 0 {
914-
listRequest.MaximumPageSize = common.Int32Ptr(defaultVisibilityMaxPageSize)
910+
listRequest.MaximumPageSize = common.Int32Ptr(wh.config.DefaultVisibilityMaxPageSize)
915911
}
916912

917913
domainName := listRequest.GetDomain()
@@ -999,7 +995,7 @@ func (wh *WorkflowHandler) ListClosedWorkflowExecutions(ctx thrift.Context,
999995
}
1000996

1001997
if !listRequest.IsSetMaximumPageSize() || listRequest.GetMaximumPageSize() == 0 {
1002-
listRequest.MaximumPageSize = common.Int32Ptr(defaultVisibilityMaxPageSize)
998+
listRequest.MaximumPageSize = common.Int32Ptr(wh.config.DefaultVisibilityMaxPageSize)
1003999
}
10041000

10051001
domainName := listRequest.GetDomain()

service/frontend/service.go

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,16 +26,34 @@ import (
2626
"github.com/uber/cadence/common/service"
2727
)
2828

29+
// Config represents configuration for cadence-frontend service
30+
type Config struct {
31+
DefaultVisibilityMaxPageSize int32
32+
DefaultHistoryMaxPageSize int32
33+
RPS int
34+
}
35+
36+
// NewConfig returns new service config with default values
37+
func NewConfig() *Config {
38+
return &Config{
39+
DefaultVisibilityMaxPageSize: 1000,
40+
DefaultHistoryMaxPageSize: 1000,
41+
RPS: 1200, // This limit is based on experimental runs.
42+
}
43+
}
44+
2945
// Service represents the cadence-frontend service
3046
type Service struct {
3147
stopC chan struct{}
48+
config *Config
3249
params *service.BootstrapParams
3350
}
3451

3552
// NewService builds a new cadence-frontend service
36-
func NewService(params *service.BootstrapParams) common.Daemon {
53+
func NewService(params *service.BootstrapParams, config *Config) common.Daemon {
3754
return &Service{
3855
params: params,
56+
config: config,
3957
stopC: make(chan struct{}),
4058
}
4159
}
@@ -89,7 +107,7 @@ func (s *Service) Start() {
89107

90108
history = persistence.NewHistoryPersistenceClient(history, base.GetMetricsClient())
91109

92-
handler, tchanServers := NewWorkflowHandler(base, metadata, history, visibility)
110+
handler, tchanServers := NewWorkflowHandler(base, s.config, metadata, history, visibility)
93111
handler.Start(tchanServers)
94112

95113
log.Infof("%v started", common.FrontendServiceName)

service/matching/handler.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,16 +40,18 @@ var _ m.TChanMatchingService = (*Handler)(nil)
4040
type Handler struct {
4141
taskPersistence persistence.TaskManager
4242
engine Engine
43+
config *Config
4344
metricsClient metrics.Client
4445
startWG sync.WaitGroup
4546
service.Service
4647
}
4748

4849
// NewHandler creates a thrift handler for the history service
49-
func NewHandler(taskPersistence persistence.TaskManager, sVice service.Service) (*Handler, []thrift.TChanServer) {
50+
func NewHandler(sVice service.Service, config *Config, taskPersistence persistence.TaskManager) (*Handler, []thrift.TChanServer) {
5051
handler := &Handler{
5152
Service: sVice,
5253
taskPersistence: taskPersistence,
54+
config: config,
5355
}
5456
// prevent us from trying to serve requests before matching engine is started and ready
5557
handler.startWG.Add(1)
@@ -64,7 +66,7 @@ func (h *Handler) Start(thriftService []thrift.TChanServer) error {
6466
return err
6567
}
6668
h.metricsClient = h.Service.GetMetricsClient()
67-
h.engine = NewEngine(h.taskPersistence, history, h.Service.GetLogger(), h.Service.GetMetricsClient())
69+
h.engine = NewEngine(h.taskPersistence, history, h.config, h.Service.GetLogger(), h.Service.GetMetricsClient())
6870
h.startWG.Done()
6971
return nil
7072
}

service/matching/matchingEngine.go

Lines changed: 16 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ package matching
2323
import (
2424
"errors"
2525
"sync"
26-
"time"
2726

2827
"github.com/pborman/uuid"
2928
"github.com/uber-common/bark"
@@ -35,7 +34,6 @@ import (
3534
workflow "github.com/uber/cadence/.gen/go/shared"
3635
"github.com/uber/cadence/client/history"
3736
"github.com/uber/cadence/common"
38-
"github.com/uber/cadence/common/backoff"
3937
"github.com/uber/cadence/common/logging"
4038
"github.com/uber/cadence/common/metrics"
4139
"github.com/uber/cadence/common/persistence"
@@ -46,15 +44,14 @@ import (
4644
// TODO: Switch implementation from lock/channel based to a partitioned agent
4745
// to simplify code and reduce possiblity of synchronization errors.
4846
type matchingEngineImpl struct {
49-
taskManager persistence.TaskManager
50-
historyService history.Client
51-
tokenSerializer common.TaskTokenSerializer
52-
rangeSize int64
53-
logger bark.Logger
54-
metricsClient metrics.Client
55-
longPollExpirationInterval time.Duration
56-
taskListsLock sync.RWMutex // locks mutation of taskLists
57-
taskLists map[taskListID]taskListManager // Convert to LRU cache
47+
taskManager persistence.TaskManager
48+
historyService history.Client
49+
tokenSerializer common.TaskTokenSerializer
50+
logger bark.Logger
51+
metricsClient metrics.Client
52+
taskListsLock sync.RWMutex // locks mutation of taskLists
53+
taskLists map[taskListID]taskListManager // Convert to LRU cache
54+
config *Config
5855
}
5956

6057
type taskListID struct {
@@ -63,20 +60,14 @@ type taskListID struct {
6360
taskType int
6461
}
6562

66-
const (
67-
defaultLongPollExpirationInterval = time.Minute
68-
emptyGetRetryInitialInterval = 100 * time.Millisecond
69-
emptyGetRetryMaxInterval = 1 * time.Second
70-
)
71-
7263
var (
7364
// EmptyPollForDecisionTaskResponse is the response when there are no decision tasks to hand out
7465
emptyPollForDecisionTaskResponse = m.NewPollForDecisionTaskResponse()
7566
// EmptyPollForActivityTaskResponse is the response when there are no activity tasks to hand out
7667
emptyPollForActivityTaskResponse = workflow.NewPollForActivityTaskResponse()
7768
persistenceOperationRetryPolicy = common.CreatePersistanceRetryPolicy()
7869
historyServiceOperationRetryPolicy = common.CreateHistoryServiceRetryPolicy()
79-
emptyGetTasksRetryPolicy = createEmptyGetTasksRetryPolicy()
70+
8071
// ErrNoTasks is exported temporarily for integration test
8172
ErrNoTasks = errors.New("No tasks")
8273
errPumpClosed = errors.New("Task list pump closed its channel")
@@ -100,20 +91,20 @@ var _ Engine = (*matchingEngineImpl)(nil) // Asserts that interface is indeed im
10091
// NewEngine creates an instance of matching engine
10192
func NewEngine(taskManager persistence.TaskManager,
10293
historyService history.Client,
94+
config *Config,
10395
logger bark.Logger,
10496
metricsClient metrics.Client) Engine {
10597

10698
return &matchingEngineImpl{
107-
taskManager: taskManager,
108-
historyService: historyService,
109-
tokenSerializer: common.NewJSONTaskTokenSerializer(),
110-
taskLists: make(map[taskListID]taskListManager),
111-
rangeSize: defaultRangeSize,
112-
longPollExpirationInterval: defaultLongPollExpirationInterval,
99+
taskManager: taskManager,
100+
historyService: historyService,
101+
tokenSerializer: common.NewJSONTaskTokenSerializer(),
102+
taskLists: make(map[taskListID]taskListManager),
113103
logger: logger.WithFields(bark.Fields{
114104
logging.TagWorkflowComponent: logging.TagValueMatchingEngineComponent,
115105
}),
116106
metricsClient: metricsClient,
107+
config: config,
117108
}
118109
}
119110

@@ -162,7 +153,7 @@ func (e *matchingEngineImpl) getTaskListManager(taskList *taskListID) (taskListM
162153
}
163154
e.taskListsLock.RUnlock()
164155
logging.LogTaskListLoadingEvent(e.logger, taskList.taskListName, taskList.taskType)
165-
mgr := newTaskListManager(e, taskList)
156+
mgr := newTaskListManager(e, taskList, e.config)
166157
e.taskListsLock.Lock()
167158
if result, ok := e.taskLists[*taskList]; ok {
168159
e.taskListsLock.Unlock()
@@ -411,13 +402,6 @@ func newTaskListID(domainID, taskListName string, taskType int) *taskListID {
411402
return &taskListID{domainID: domainID, taskListName: taskListName, taskType: taskType}
412403
}
413404

414-
func createEmptyGetTasksRetryPolicy() backoff.RetryPolicy {
415-
policy := backoff.NewExponentialRetryPolicy(emptyGetRetryInitialInterval)
416-
policy.SetMaximumInterval(emptyGetRetryMaxInterval)
417-
418-
return policy
419-
}
420-
421405
func workflowExecutionPtr(execution workflow.WorkflowExecution) *workflow.WorkflowExecution {
422406
return &execution
423407
}

0 commit comments

Comments
 (0)