Skip to content

Commit 1f6917c

Browse files
authored
persistence: expose close methods to cleanup gocql session conns (#240)
1 parent 7eacb42 commit 1f6917c

20 files changed

+135
-25
lines changed

common/mocks/ExecutionManager.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,11 @@ type ExecutionManager struct {
2828
mock.Mock
2929
}
3030

31+
// Close provides a mock function with given fields:
32+
func (_m *ExecutionManager) Close() {
33+
_m.Called()
34+
}
35+
3136
// CompleteTimerTask provides a mock function with given fields: request
3237
func (_m *ExecutionManager) CompleteTimerTask(request *persistence.CompleteTimerTaskRequest) error {
3338
ret := _m.Called(request)

common/mocks/HistoryManager.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,11 @@ type HistoryManager struct {
2828
mock.Mock
2929
}
3030

31+
// Close provides a mock function with given fields:
32+
func (_m *HistoryManager) Close() {
33+
_m.Called()
34+
}
35+
3136
// AppendHistoryEvents provides a mock function with given fields: request
3237
func (_m *HistoryManager) AppendHistoryEvents(request *persistence.AppendHistoryEventsRequest) error {
3338
ret := _m.Called(request)

common/mocks/MetadataManager.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,11 @@ type MetadataManager struct {
2828
mock.Mock
2929
}
3030

31+
// Close provides a mock function with given fields:
32+
func (_m *MetadataManager) Close() {
33+
_m.Called()
34+
}
35+
3136
// CreateDomain provides a mock function with given fields: request
3237
func (_m *MetadataManager) CreateDomain(request *persistence.CreateDomainRequest) (*persistence.CreateDomainResponse, error) {
3338
ret := _m.Called(request)

common/mocks/ShardManager.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,11 @@ type ShardManager struct {
2828
mock.Mock
2929
}
3030

31+
// Close provides a mock function with given fields:
32+
func (_m *ShardManager) Close() {
33+
_m.Called()
34+
}
35+
3136
// CreateShard provides a mock function with given fields: request
3237
func (_m *ShardManager) CreateShard(request *persistence.CreateShardRequest) error {
3338
ret := _m.Called(request)

common/mocks/TaskManager.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,11 @@ type TaskManager struct {
2828
mock.Mock
2929
}
3030

31+
// Close provides a mock function with given fields:
32+
func (_m *TaskManager) Close() {
33+
_m.Called()
34+
}
35+
3136
// LeaseTaskList provides a mock function with given fields: request
3237
func (_m *TaskManager) LeaseTaskList(request *persistence.LeaseTaskListRequest) (*persistence.LeaseTaskListResponse, error) {
3338
ret := _m.Called(request)

common/mocks/VisibilityManager.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,11 @@ type VisibilityManager struct {
2828
mock.Mock
2929
}
3030

31+
// Close provides a mock function with given fields:
32+
func (_m *VisibilityManager) Close() {
33+
_m.Called()
34+
}
35+
3136
// GetClosedWorkflowExecution provides a mock function with given fields: request
3237
func (_m *VisibilityManager) GetClosedWorkflowExecution(request *persistence.GetClosedWorkflowExecutionRequest) (*persistence.GetClosedWorkflowExecutionResponse, error) {
3338
ret := _m.Called(request)

common/persistence/cassandraHistoryPersistence.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,13 @@ func NewCassandraHistoryPersistence(hosts string, dc string, keyspace string, lo
7777
return &cassandraHistoryPersistence{session: session, logger: logger}, nil
7878
}
7979

80+
// Close gracefully releases the resources held by this object
81+
func (h *cassandraHistoryPersistence) Close() {
82+
if h.session != nil {
83+
h.session.Close()
84+
}
85+
}
86+
8087
func (h *cassandraHistoryPersistence) AppendHistoryEvents(request *AppendHistoryEventsRequest) error {
8188
var query *gocql.Query
8289
if request.Overwrite {

common/persistence/cassandraMetadataPersistence.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,13 @@ func NewCassandraMetadataPersistence(hosts string, dc string, keyspace string, l
105105
return &cassandraMetadataPersistence{session: session, logger: logger}, nil
106106
}
107107

108+
// Close releases the resources held by this object
109+
func (m *cassandraMetadataPersistence) Close() {
110+
if m.session != nil {
111+
m.session.Close()
112+
}
113+
}
114+
108115
// Cassandra does not support conditional updates across multiple tables. For this reason we have to first insert into
109116
// 'Domains' table and then do a conditional insert into domains_by_name table. If the conditional write fails we
110117
// delete the orphaned entry from domains table. There is a chance delete entry could fail and we never delete the

common/persistence/cassandraPersistence.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -517,6 +517,13 @@ func NewCassandraTaskPersistence(hosts string, dc string, keyspace string, logge
517517
return &cassandraPersistence{shardID: -1, session: session, lowConslevel: gocql.One, logger: logger}, nil
518518
}
519519

520+
// Close releases the underlying resources held by this object
521+
func (d *cassandraPersistence) Close() {
522+
if d.session != nil {
523+
d.session.Close()
524+
}
525+
}
526+
520527
func (d *cassandraPersistence) CreateShard(request *CreateShardRequest) error {
521528
cqlNowTimestamp := common.UnixNanoToCQLTimestamp(time.Now().UnixNano())
522529
shardInfo := request.ShardInfo

common/persistence/cassandraVisibilityPersistence.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,13 @@ func NewCassandraVisibilityPersistence(
140140
return &cassandraVisibilityPersistence{session: session, lowConslevel: gocql.One, logger: logger}, nil
141141
}
142142

143+
// Close releases the resources held by this object
144+
func (v *cassandraVisibilityPersistence) Close() {
145+
if v.session != nil {
146+
v.session.Close()
147+
}
148+
}
149+
143150
func (v *cassandraVisibilityPersistence) RecordWorkflowExecutionStarted(
144151
request *RecordWorkflowExecutionStartedRequest) error {
145152
query := v.session.Query(templateCreateWorkflowExecutionStarted,

common/persistence/dataInterfaces.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -594,15 +594,22 @@ type (
594594
Name string
595595
}
596596

597+
// Closeable is an interface for any entity that supports a close operation to release resources
598+
Closeable interface {
599+
Close()
600+
}
601+
597602
// ShardManager is used to manage all shards
598603
ShardManager interface {
604+
Closeable
599605
CreateShard(request *CreateShardRequest) error
600606
GetShard(request *GetShardRequest) (*GetShardResponse, error)
601607
UpdateShard(request *UpdateShardRequest) error
602608
}
603609

604610
// ExecutionManager is used to manage workflow executions
605611
ExecutionManager interface {
612+
Closeable
606613
CreateWorkflowExecution(request *CreateWorkflowExecutionRequest) (*CreateWorkflowExecutionResponse, error)
607614
GetWorkflowExecution(request *GetWorkflowExecutionRequest) (*GetWorkflowExecutionResponse, error)
608615
UpdateWorkflowExecution(request *UpdateWorkflowExecutionRequest) error
@@ -623,6 +630,7 @@ type (
623630

624631
// TaskManager is used to manage tasks
625632
TaskManager interface {
633+
Closeable
626634
LeaseTaskList(request *LeaseTaskListRequest) (*LeaseTaskListResponse, error)
627635
UpdateTaskList(request *UpdateTaskListRequest) (*UpdateTaskListResponse, error)
628636
CreateTasks(request *CreateTasksRequest) (*CreateTasksResponse, error)
@@ -632,6 +640,7 @@ type (
632640

633641
// HistoryManager is used to manage Workflow Execution HistoryEventBatch
634642
HistoryManager interface {
643+
Closeable
635644
AppendHistoryEvents(request *AppendHistoryEventsRequest) error
636645
// GetWorkflowExecutionHistory retrieves the paginated list of history events for given execution
637646
GetWorkflowExecutionHistory(request *GetWorkflowExecutionHistoryRequest) (*GetWorkflowExecutionHistoryResponse,
@@ -641,6 +650,7 @@ type (
641650

642651
// MetadataManager is used to manage metadata CRUD for various entities
643652
MetadataManager interface {
653+
Closeable
644654
CreateDomain(request *CreateDomainRequest) (*CreateDomainResponse, error)
645655
GetDomain(request *GetDomainRequest) (*GetDomainResponse, error)
646656
UpdateDomain(request *UpdateDomainRequest) error

common/persistence/persistenceMetricClients.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,10 @@ func (p *shardPersistenceClient) UpdateShard(request *UpdateShardRequest) error
154154
return err
155155
}
156156

157+
func (p *shardPersistenceClient) Close() {
158+
p.persistence.Close()
159+
}
160+
157161
func (p *workflowExecutionPersistenceClient) CreateWorkflowExecution(request *CreateWorkflowExecutionRequest) (*CreateWorkflowExecutionResponse, error) {
158162
p.metricClient.IncCounter(metrics.PersistenceCreateWorkflowExecutionScope, metrics.PersistenceRequests)
159163

@@ -296,6 +300,10 @@ func (p *workflowExecutionPersistenceClient) updateErrorMetric(scope int, err er
296300
}
297301
}
298302

303+
func (p *workflowExecutionPersistenceClient) Close() {
304+
p.persistence.Close()
305+
}
306+
299307
func (p *taskPersistenceClient) CreateTasks(request *CreateTasksRequest) (*CreateTasksResponse, error) {
300308
p.metricClient.IncCounter(metrics.PersistenceCreateTaskScope, metrics.PersistenceRequests)
301309

@@ -378,6 +386,10 @@ func (p *taskPersistenceClient) updateErrorMetric(scope int, err error) {
378386
}
379387
}
380388

389+
func (p *taskPersistenceClient) Close() {
390+
p.persistence.Close()
391+
}
392+
381393
func (p *historyPersistenceClient) AppendHistoryEvents(request *AppendHistoryEventsRequest) error {
382394
p.metricClient.IncCounter(metrics.PersistenceAppendHistoryEventsScope, metrics.PersistenceRequests)
383395

@@ -436,6 +448,10 @@ func (p *historyPersistenceClient) updateErrorMetric(scope int, err error) {
436448
}
437449
}
438450

451+
func (p *historyPersistenceClient) Close() {
452+
p.persistence.Close()
453+
}
454+
439455
func (p *metadataPersistenceClient) CreateDomain(request *CreateDomainRequest) (*CreateDomainResponse, error) {
440456
p.metricClient.IncCounter(metrics.PersistenceCreateDomainScope, metrics.PersistenceRequests)
441457

@@ -506,6 +522,10 @@ func (p *metadataPersistenceClient) DeleteDomainByName(request *DeleteDomainByNa
506522
return err
507523
}
508524

525+
func (p *metadataPersistenceClient) Close() {
526+
p.persistence.Close()
527+
}
528+
509529
func (p *metadataPersistenceClient) updateErrorMetric(scope int, err error) {
510530
switch err.(type) {
511531
case *workflow.DomainAlreadyExistsError:

common/persistence/visibilityInterfaces.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,7 @@ type (
105105

106106
// VisibilityManager is used to manage the visibility store
107107
VisibilityManager interface {
108+
Closeable
108109
RecordWorkflowExecutionStarted(request *RecordWorkflowExecutionStartedRequest) error
109110
RecordWorkflowExecutionClosed(request *RecordWorkflowExecutionClosedRequest) error
110111
ListOpenWorkflowExecutions(request *ListWorkflowExecutionsRequest) (*ListWorkflowExecutionsResponse, error)

service/frontend/handler.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,9 @@ func (wh *WorkflowHandler) Start(thriftService []thrift.TChanServer) error {
121121

122122
// Stop stops the handler
123123
func (wh *WorkflowHandler) Stop() {
124+
wh.metadataMgr.Close()
125+
wh.visibitiltyMgr.Close()
126+
wh.historyMgr.Close()
124127
wh.Service.Stop()
125128
}
126129

service/history/handler.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,10 @@ func (h *Handler) Start(thriftService []thrift.TChanServer) error {
113113
// Stop stops the handler
114114
func (h *Handler) Stop() {
115115
h.controller.Stop()
116+
h.shardManager.Close()
117+
h.historyMgr.Close()
118+
h.metadataMgr.Close()
119+
h.visibilityMgr.Close()
116120
h.Service.Stop()
117121
}
118122

service/history/shardController.go

Lines changed: 31 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -65,14 +65,14 @@ type (
6565
}
6666

6767
historyShardsItem struct {
68-
shardID int
69-
shardMgr persistence.ShardManager
70-
historyMgr persistence.HistoryManager
71-
executionMgrFactory persistence.ExecutionManagerFactory
72-
engineFactory EngineFactory
73-
host *membership.HostInfo
74-
logger bark.Logger
75-
metricsClient metrics.Client
68+
shardID int
69+
shardMgr persistence.ShardManager
70+
historyMgr persistence.HistoryManager
71+
executionMgr persistence.ExecutionManager
72+
engineFactory EngineFactory
73+
host *membership.HostInfo
74+
logger bark.Logger
75+
metricsClient metrics.Client
7676

7777
sync.RWMutex
7878
engine Engine
@@ -106,19 +106,25 @@ func newShardController(numberOfShards int, host *membership.HostInfo, resolver
106106

107107
func newHistoryShardsItem(shardID int, shardMgr persistence.ShardManager, historyMgr persistence.HistoryManager,
108108
executionMgrFactory persistence.ExecutionManagerFactory, factory EngineFactory, host *membership.HostInfo,
109-
logger bark.Logger, reporter metrics.Client) *historyShardsItem {
109+
logger bark.Logger, reporter metrics.Client) (*historyShardsItem, error) {
110+
111+
executionMgr, err := executionMgrFactory.CreateExecutionManager(shardID)
112+
if err != nil {
113+
return nil, err
114+
}
115+
110116
return &historyShardsItem{
111-
shardID: shardID,
112-
shardMgr: shardMgr,
113-
historyMgr: historyMgr,
114-
executionMgrFactory: executionMgrFactory,
115-
engineFactory: factory,
116-
host: host,
117+
shardID: shardID,
118+
shardMgr: shardMgr,
119+
historyMgr: historyMgr,
120+
executionMgr: executionMgr,
121+
engineFactory: factory,
122+
host: host,
117123
logger: logger.WithFields(bark.Fields{
118124
logging.TagHistoryShardID: shardID,
119125
}),
120126
metricsClient: reporter,
121-
}
127+
}, nil
122128
}
123129

124130
func (c *shardController) Start() {
@@ -204,8 +210,11 @@ func (c *shardController) getOrCreateHistoryShardItem(shardID int) (*historyShar
204210
}
205211

206212
if info.Identity() == c.host.Identity() {
207-
shardItem := newHistoryShardsItem(shardID, c.shardMgr, c.historyMgr, c.executionMgrFactory, c.engineFactory, c.host,
213+
shardItem, err := newHistoryShardsItem(shardID, c.shardMgr, c.historyMgr, c.executionMgrFactory, c.engineFactory, c.host,
208214
c.logger, c.metricsClient)
215+
if err != nil {
216+
return nil, err
217+
}
209218
c.historyShards[shardID] = shardItem
210219
logging.LogShardItemCreatedEvent(shardItem.logger, info.Identity(), shardID)
211220
return shardItem, nil
@@ -304,13 +313,8 @@ func (i *historyShardsItem) getOrCreateEngine(shardClosedCh chan<- int) (Engine,
304313
}
305314

306315
logging.LogShardEngineCreatingEvent(i.logger, i.host.Identity(), i.shardID)
307-
defer logging.LogShardEngineCreatedEvent(i.logger, i.host.Identity(), i.shardID)
308-
executionMgr, err := i.executionMgrFactory.CreateExecutionManager(i.shardID)
309-
if err != nil {
310-
return nil, err
311-
}
312316

313-
context, err := acquireShard(i.shardID, i.shardMgr, i.historyMgr, executionMgr, i.host.Identity(), shardClosedCh,
317+
context, err := acquireShard(i.shardID, i.shardMgr, i.historyMgr, i.executionMgr, i.host.Identity(), shardClosedCh,
314318
i.logger, i.metricsClient)
315319
if err != nil {
316320
return nil, err
@@ -319,18 +323,20 @@ func (i *historyShardsItem) getOrCreateEngine(shardClosedCh chan<- int) (Engine,
319323
i.engine = i.engineFactory.CreateEngine(context)
320324
i.engine.Start()
321325

326+
logging.LogShardEngineCreatedEvent(i.logger, i.host.Identity(), i.shardID)
322327
return i.engine, nil
323328
}
324329

325330
func (i *historyShardsItem) stopEngine() {
326-
logging.LogShardEngineStoppingEvent(i.logger, i.host.Identity(), i.shardID)
327-
defer logging.LogShardEngineStoppedEvent(i.logger, i.host.Identity(), i.shardID)
328331
i.Lock()
329332
defer i.Unlock()
330333

331334
if i.engine != nil {
335+
logging.LogShardEngineStoppingEvent(i.logger, i.host.Identity(), i.shardID)
332336
i.engine.Stop()
333337
i.engine = nil
338+
i.executionMgr.Close()
339+
logging.LogShardEngineStoppedEvent(i.logger, i.host.Identity(), i.shardID)
334340
}
335341
}
336342

service/history/shardController_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -440,6 +440,7 @@ func (s *shardControllerSuite) TestShardControllerClosed() {
440440
func (s *shardControllerSuite) setupMocksForAcquireShard(shardID int, mockEngine *MockHistoryEngine, currentRangeID,
441441
newRangeID int64) {
442442
mockExecutionMgr := &mmocks.ExecutionManager{}
443+
mockExecutionMgr.On("Close").Return()
443444
s.mockExecutionMgrFactory.On("CreateExecutionManager", shardID).Return(mockExecutionMgr, nil).Once()
444445
mockEngine.On("Start").Return().Once()
445446
s.mockServiceResolver.On("Lookup", string(shardID)).Return(s.hostInfo, nil).Twice()

service/matching/handler.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,8 @@ func (h *Handler) Start(thriftService []thrift.TChanServer) error {
6969

7070
// Stop stops the handler
7171
func (h *Handler) Stop() {
72+
h.engine.Stop()
73+
h.taskPersistence.Close()
7274
h.Service.Stop()
7375
}
7476

service/matching/matchingEngineInterfaces.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import (
2929
type (
3030
// Engine exposes interfaces for clients to poll for activity and decision tasks.
3131
Engine interface {
32+
Stop()
3233
AddDecisionTask(addRequest *m.AddDecisionTaskRequest) error
3334
AddActivityTask(addRequest *m.AddActivityTaskRequest) error
3435
PollForDecisionTask(ctx thrift.Context, request *m.PollForDecisionTaskRequest) (*m.PollForDecisionTaskResponse, error)

0 commit comments

Comments
 (0)