Skip to content

Commit 886a2dc

Browse files
author
Tamer Eldeeb
authored
Pass config to cadence history service during initialization (#301)
Issue #286
1 parent aafd57d commit 886a2dc

23 files changed

+279
-212
lines changed

cmd/server/server.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ func (s *server) startService() common.Daemon {
110110
case frontendService:
111111
daemon = frontend.NewService(&params, frontend.NewConfig())
112112
case historyService:
113-
daemon = history.NewService(&params)
113+
daemon = history.NewService(&params, history.NewConfig(s.cfg.Cassandra.NumHistoryShards))
114114
case matchingService:
115115
daemon = matching.NewService(&params, matching.NewConfig())
116116
}

host/onebox.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -180,8 +180,8 @@ func (c *cadenceImpl) startHistory(logger bark.Logger, shardMgr persistence.Shar
180180
service := service.New(params)
181181
var thriftServices []thrift.TChanServer
182182
var handler *history.Handler
183-
handler, thriftServices = history.NewHandler(service, shardMgr, metadataMgr, visibilityMgr, historyMgr, executionMgrFactory,
184-
c.numberOfHistoryShards)
183+
handler, thriftServices = history.NewHandler(service, history.NewConfig(c.numberOfHistoryShards), shardMgr, metadataMgr,
184+
visibilityMgr, historyMgr, executionMgrFactory)
185185
handler.Start(thriftServices)
186186
c.historyHandlers = append(c.historyHandlers, handler)
187187
}

service/history/handler.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ type Handler struct {
5252
tokenSerializer common.TaskTokenSerializer
5353
startWG sync.WaitGroup
5454
metricsClient metrics.Client
55+
config *Config
5556
service.Service
5657
}
5758

@@ -64,17 +65,17 @@ var (
6465
)
6566

6667
// NewHandler creates a thrift handler for the history service
67-
func NewHandler(sVice service.Service, shardManager persistence.ShardManager, metadataMgr persistence.MetadataManager,
68-
visibilityMgr persistence.VisibilityManager, historyMgr persistence.HistoryManager,
69-
executionMgrFactory persistence.ExecutionManagerFactory, numberOfShards int) (*Handler, []thrift.TChanServer) {
68+
func NewHandler(sVice service.Service, config *Config, shardManager persistence.ShardManager,
69+
metadataMgr persistence.MetadataManager, visibilityMgr persistence.VisibilityManager,
70+
historyMgr persistence.HistoryManager, executionMgrFactory persistence.ExecutionManagerFactory) (*Handler, []thrift.TChanServer) {
7071
handler := &Handler{
7172
Service: sVice,
73+
config: config,
7274
shardManager: shardManager,
7375
metadataMgr: metadataMgr,
7476
historyMgr: historyMgr,
7577
visibilityMgr: visibilityMgr,
7678
executionMgrFactory: executionMgrFactory,
77-
numberOfShards: numberOfShards,
7879
tokenSerializer: common.NewJSONTaskTokenSerializer(),
7980
}
8081
// prevent us from trying to serve requests before shard controller is started and ready
@@ -102,8 +103,8 @@ func (h *Handler) Start(thriftService []thrift.TChanServer) error {
102103
h.Service.GetLogger().Fatalf("Unable to get history service resolver.")
103104
}
104105
h.hServiceResolver = hServiceResolver
105-
h.controller = newShardController(h.numberOfShards, h.GetHostInfo(), hServiceResolver, h.shardManager, h.historyMgr,
106-
h.executionMgrFactory, h, h.GetLogger(), h.GetMetricsClient())
106+
h.controller = newShardController(h.GetHostInfo(), hServiceResolver, h.shardManager, h.historyMgr,
107+
h.executionMgrFactory, h, h.config, h.GetLogger(), h.GetMetricsClient())
107108
h.controller.Start()
108109
h.metricsClient = h.GetMetricsClient()
109110
h.startWG.Done()

service/history/historyBuilder_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ func (s *historyBuilderSuite) SetupTest() {
5858
// Have to define our overridden assertions in the test setup. If we did it earlier, s.T() will return nil
5959
s.Assertions = require.New(s.T())
6060
s.domainID = "history-builder-test-domain"
61-
s.msBuilder = newMutableStateBuilder(s.logger)
61+
s.msBuilder = newMutableStateBuilder(NewConfig(1), s.logger)
6262
s.builder = newHistoryBuilder(s.msBuilder, s.logger)
6363
}
6464

service/history/historyCache.go

Lines changed: 7 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,6 @@
2121
package history
2222

2323
import (
24-
"time"
25-
2624
workflow "github.com/uber/cadence/.gen/go/shared"
2725
"github.com/uber/cadence/common/backoff"
2826
"github.com/uber/cadence/common/cache"
@@ -33,12 +31,6 @@ import (
3331
"github.com/uber/cadence/common"
3432
)
3533

36-
const (
37-
historyCacheInitialSize = 256
38-
historyCacheMaxSize = 1 * 1024
39-
historyCacheTTL time.Duration = time.Hour
40-
)
41-
4234
type (
4335
releaseWorkflowExecutionFunc func()
4436

@@ -48,6 +40,7 @@ type (
4840
executionManager persistence.ExecutionManager
4941
disabled bool
5042
logger bark.Logger
43+
config *Config
5144
}
5245
)
5346

@@ -57,19 +50,21 @@ var (
5750
ErrTryLock = &workflow.InternalServiceError{Message: "Failed to acquire lock, backoff and retry"}
5851
)
5952

60-
func newHistoryCache(maxSize int, shard ShardContext, logger bark.Logger) *historyCache {
53+
func newHistoryCache(shard ShardContext, logger bark.Logger) *historyCache {
6154
opts := &cache.Options{}
62-
opts.InitialCapacity = historyCacheInitialSize
63-
opts.TTL = historyCacheTTL
55+
config := shard.GetConfig()
56+
opts.InitialCapacity = config.HistoryCacheInitialSize
57+
opts.TTL = config.HistoryCacheTTL
6458
opts.Pin = true
6559

6660
return &historyCache{
67-
Cache: cache.New(maxSize, opts),
61+
Cache: cache.New(config.HistoryCacheMaxSize, opts),
6862
shard: shard,
6963
executionManager: shard.GetExecutionManager(),
7064
logger: logger.WithFields(bark.Fields{
7165
logging.TagWorkflowComponent: logging.TagValueHistoryCacheComponent,
7266
}),
67+
config: config,
7368
}
7469
}
7570

service/history/historyCache_test.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -66,18 +66,19 @@ func (s *historyCacheSuite) SetupTest() {
6666
transferSequenceNumber: 1,
6767
executionManager: s.mockExecutionMgr,
6868
shardManager: &mocks.ShardManager{},
69-
rangeSize: defaultRangeSize,
7069
maxTransferSequenceNumber: 100000,
7170
closeCh: make(chan int, 100),
71+
config: NewConfig(1),
7272
logger: s.logger,
7373
metricsClient: metrics.NewClient(tally.NoopScope, metrics.History),
7474
}
75-
s.cache = newHistoryCache(historyCacheMaxSize, s.mockShard, s.logger)
75+
s.cache = newHistoryCache(s.mockShard, s.logger)
7676
}
7777

7878
func (s *historyCacheSuite) TestHistoryCachePinning() {
79+
s.mockShard.GetConfig().HistoryCacheMaxSize = 2
7980
domain := "test_domain"
80-
s.cache = newHistoryCache(2, s.mockShard, s.logger)
81+
s.cache = newHistoryCache(s.mockShard, s.logger)
8182
we := workflow.WorkflowExecution{
8283
WorkflowId: common.StringPtr("wf-cache-test"),
8384
RunId: common.StringPtr(uuid.New()),

service/history/historyEngine.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ func NewEngineWithShardContext(shard ShardContext, metadataMgr persistence.Metad
8989
logger := shard.GetLogger()
9090
executionManager := shard.GetExecutionManager()
9191
historyManager := shard.GetHistoryManager()
92-
historyCache := newHistoryCache(historyCacheMaxSize, shard, logger)
92+
historyCache := newHistoryCache(shard, logger)
9393
domainCache := cache.NewDomainCache(metadataMgr, logger)
9494
txProcessor := newTransferQueueProcessor(shard, visibilityMgr, matching, historyClient, historyCache, domainCache)
9595
historyEngImpl := &historyEngineImpl{
@@ -166,7 +166,7 @@ func (e *historyEngineImpl) StartWorkflowExecution(startRequest *h.StartWorkflow
166166

167167
// Generate first decision task event.
168168
taskList := request.GetTaskList().GetName()
169-
msBuilder := newMutableStateBuilder(e.logger)
169+
msBuilder := newMutableStateBuilder(e.shard.GetConfig(), e.logger)
170170
startedEvent := msBuilder.AddWorkflowExecutionStartedEvent(domainID, workflowExecution, request)
171171
if startedEvent == nil {
172172
return nil, &workflow.InternalServiceError{Message: "Failed to add workflow execution started event."}
@@ -1513,7 +1513,7 @@ func (e *historyEngineImpl) getTimerBuilder(we *workflow.WorkflowExecution) *tim
15131513
logging.TagWorkflowExecutionID: we.GetWorkflowId(),
15141514
logging.TagWorkflowRunID: we.GetRunId(),
15151515
})
1516-
return newTimerBuilder(lg, common.NewRealTimeSource())
1516+
return newTimerBuilder(e.shard.GetConfig(), lg, common.NewRealTimeSource())
15171517
}
15181518

15191519
func (s *shardContextWrapper) UpdateWorkflowExecution(request *persistence.UpdateWorkflowExecutionRequest) error {

service/history/historyEngine2_test.go

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ type (
5858
mockShardManager *mocks.ShardManager
5959
shardClosedCh chan int
6060
eventSerializer historyEventSerializer
61+
config *Config
6162
logger bark.Logger
6263
}
6364
)
@@ -75,6 +76,7 @@ func (s *engine2Suite) SetupSuite() {
7576
l := log.New()
7677
l.Level = log.DebugLevel
7778
s.logger = bark.NewLoggerFromLogrus(l)
79+
s.config = NewConfig(1)
7880
}
7981

8082
func (s *engine2Suite) TearDownSuite() {
@@ -101,14 +103,14 @@ func (s *engine2Suite) SetupTest() {
101103
executionManager: s.mockExecutionMgr,
102104
historyMgr: s.mockHistoryMgr,
103105
shardManager: s.mockShardManager,
104-
rangeSize: defaultRangeSize,
105106
maxTransferSequenceNumber: 100000,
106107
closeCh: s.shardClosedCh,
108+
config: s.config,
107109
logger: s.logger,
108110
metricsClient: metrics.NewClient(tally.NoopScope, metrics.History),
109111
}
110112

111-
historyCache := newHistoryCache(historyCacheMaxSize, mockShard, s.logger)
113+
historyCache := newHistoryCache(mockShard, s.logger)
112114
domainCache := cache.NewDomainCache(s.mockMetadataMgr, s.logger)
113115
txProcessor := newTransferQueueProcessor(mockShard, s.mockVisibilityMgr, s.mockMatchingClient, s.mockHistoryClient, historyCache, domainCache)
114116
h := &historyEngineImpl{
@@ -605,7 +607,7 @@ func (s *engine2Suite) TestRequestCancelWorkflowExecutionFail() {
605607

606608
func (s *engine2Suite) createExecutionStartedState(we workflow.WorkflowExecution, tl, identity string,
607609
startDecision bool) *mutableStateBuilder {
608-
msBuilder := newMutableStateBuilder(s.logger)
610+
msBuilder := newMutableStateBuilder(s.config, s.logger)
609611
addWorkflowExecutionStartedEvent(msBuilder, we, "wType", tl, []byte("input"), 100, 200, identity)
610612
scheduleEvent, _ := addDecisionTaskScheduledEvent(msBuilder)
611613
if startDecision {
@@ -641,7 +643,7 @@ func (s *engine2Suite) TestRespondDecisionTaskCompletedRecordMarkerDecision() {
641643
markerDetails := []byte("marker details")
642644
markerName := "marker name"
643645

644-
msBuilder := newMutableStateBuilder(bark.NewLoggerFromLogrus(log.New()))
646+
msBuilder := newMutableStateBuilder(s.config, bark.NewLoggerFromLogrus(log.New()))
645647
addWorkflowExecutionStartedEvent(msBuilder, we, "wType", tl, []byte("input"), 100, 200, identity)
646648
scheduleEvent, _ := addDecisionTaskScheduledEvent(msBuilder)
647649
addDecisionTaskStartedEvent(msBuilder, scheduleEvent.GetEventId(), tl, identity)

0 commit comments

Comments
 (0)