Skip to content

Commit 6d8ad64

Browse files
authored
Delete workflow execution on workflow time out. (#267)
1 parent 9971c83 commit 6d8ad64

File tree

4 files changed

+86
-24
lines changed

4 files changed

+86
-24
lines changed

host/integration_test.go

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2449,6 +2449,8 @@ func (s *integrationSuite) TestChildWorkflowWithContinueAsNew() {
24492449
}
24502450

24512451
func (s *integrationSuite) TestWorkflowTimeout() {
2452+
startTime := time.Now().UnixNano()
2453+
24522454
id := "integration-workflow-timeout-test"
24532455
wt := "integration-workflow-timeout-type"
24542456
tl := "integration-workflow-timeout-tasklist"
@@ -2505,6 +2507,29 @@ GetHistoryLoop:
25052507
break GetHistoryLoop
25062508
}
25072509
s.True(workflowComplete)
2510+
2511+
startFilter := workflow.NewStartTimeFilter()
2512+
startFilter.EarliestTime = common.Int64Ptr(startTime)
2513+
startFilter.LatestTime = common.Int64Ptr(time.Now().UnixNano())
2514+
2515+
closedCount := 0
2516+
ListClosedLoop:
2517+
for i := 0; i < 10; i++ {
2518+
resp, err3 := s.engine.ListClosedWorkflowExecutions(&workflow.ListClosedWorkflowExecutionsRequest{
2519+
Domain: common.StringPtr(s.domainName),
2520+
MaximumPageSize: common.Int32Ptr(100),
2521+
StartTimeFilter: startFilter,
2522+
})
2523+
s.Nil(err3)
2524+
closedCount = len(resp.Executions)
2525+
if closedCount == 0 {
2526+
s.logger.Info("Closed WorkflowExecution is not yet visibile")
2527+
time.Sleep(100 * time.Millisecond)
2528+
continue ListClosedLoop
2529+
}
2530+
break ListClosedLoop
2531+
}
2532+
s.Equal(1, closedCount)
25082533
}
25092534

25102535
func (s *integrationSuite) setupShards() {

service/history/historyEngine.go

Lines changed: 36 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -863,21 +863,12 @@ Update_History_Loop:
863863
}
864864

865865
if isComplete {
866-
// Generate a transfer task to delete workflow execution
867-
transferTasks = append(transferTasks, &persistence.DeleteExecutionTask{})
868-
869-
// Generate a timer task to cleanup history events for this workflow execution
870-
var retentionInDays int32
871-
_, domainConfig, err := e.domainCache.GetDomainByID(domainID)
866+
tranT, timerT, err := e.getDeleteWorkflowTasks(domainID, context)
872867
if err != nil {
873-
if _, ok := err.(*workflow.EntityNotExistsError); !ok {
874-
return err
875-
}
876-
} else {
877-
retentionInDays = domainConfig.Retention
868+
return err
878869
}
879-
cleanupTask := context.tBuilder.createDeleteHistoryEventTimerTask(time.Duration(retentionInDays) * time.Hour * 24)
880-
timerTasks = append(timerTasks, cleanupTask)
870+
transferTasks = append(transferTasks, tranT)
871+
timerTasks = append(timerTasks, timerT)
881872
}
882873

883874
// Generate a transaction ID for appending events to history
@@ -1406,14 +1397,19 @@ Update_History_Loop:
14061397
return err1
14071398
}
14081399

1409-
var transferTasks []persistence.Task
14101400
if err := action(msBuilder); err != nil {
14111401
return err
14121402
}
14131403

1404+
var transferTasks []persistence.Task
1405+
var timerTasks []persistence.Task
14141406
if createDeletionTask {
1415-
// Create a transfer task to delete workflow execution
1416-
transferTasks = append(transferTasks, &persistence.DeleteExecutionTask{})
1407+
tranT, timerT, err := e.getDeleteWorkflowTasks(domainID, context)
1408+
if err != nil {
1409+
return err
1410+
}
1411+
transferTasks = append(transferTasks, tranT)
1412+
timerTasks = append(timerTasks, timerT)
14171413
}
14181414

14191415
if createDecisionTask {
@@ -1436,7 +1432,7 @@ Update_History_Loop:
14361432

14371433
// We apply the update to execution using optimistic concurrency. If it fails due to a conflict then reload
14381434
// the history and try the operation again.
1439-
if err := context.updateWorkflowExecution(transferTasks, nil, transactionID); err != nil {
1435+
if err := context.updateWorkflowExecution(transferTasks, timerTasks, transactionID); err != nil {
14401436
if err == ErrConflict {
14411437
continue Update_History_Loop
14421438
}
@@ -1447,6 +1443,29 @@ Update_History_Loop:
14471443
return ErrMaxAttemptsExceeded
14481444
}
14491445

1446+
func (e *historyEngineImpl) getDeleteWorkflowTasks(
1447+
domainID string,
1448+
context *workflowExecutionContext,
1449+
) (persistence.Task, persistence.Task, error) {
1450+
1451+
// Create a transfer task to delete workflow execution
1452+
deleteTask := &persistence.DeleteExecutionTask{}
1453+
1454+
// Generate a timer task to cleanup history events for this workflow execution
1455+
var retentionInDays int32
1456+
_, domainConfig, err := e.domainCache.GetDomainByID(domainID)
1457+
if err != nil {
1458+
if _, ok := err.(*workflow.EntityNotExistsError); !ok {
1459+
return nil, nil, err
1460+
}
1461+
} else {
1462+
retentionInDays = domainConfig.Retention
1463+
}
1464+
cleanupTask := context.tBuilder.createDeleteHistoryEventTimerTask(time.Duration(retentionInDays) * time.Hour * 24)
1465+
1466+
return deleteTask, cleanupTask, nil
1467+
}
1468+
14501469
func (e *historyEngineImpl) createRecordDecisionTaskStartedResponse(domainID string, msBuilder *mutableStateBuilder,
14511470
startedEventID int64) *h.RecordDecisionTaskStartedResponse {
14521471
response := h.NewRecordDecisionTaskStartedResponse()

service/history/timerQueueProcessor.go

Lines changed: 21 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -540,7 +540,7 @@ Update_History_Loop:
540540

541541
// We apply the update to execution using optimistic concurrency. If it fails due to a conflict than reload
542542
// the history and try the operation again.
543-
err := t.updateWorkflowExecution(context, msBuilder, scheduleNewDecision, timerTasks, nil)
543+
err := t.updateWorkflowExecution(context, msBuilder, scheduleNewDecision, false, timerTasks, nil)
544544
if err != nil {
545545
if err == ErrConflict {
546546
continue Update_History_Loop
@@ -667,7 +667,7 @@ Update_History_Loop:
667667
// We apply the update to execution using optimistic concurrency. If it fails due to a conflict than reload
668668
// the history and try the operation again.
669669
defer t.NotifyNewTimer(timerTasks)
670-
err := t.updateWorkflowExecution(context, msBuilder, scheduleNewDecision, timerTasks, nil)
670+
err := t.updateWorkflowExecution(context, msBuilder, scheduleNewDecision, false, timerTasks, nil)
671671
if err != nil {
672672
if err == ErrConflict {
673673
continue Update_History_Loop
@@ -744,7 +744,7 @@ Update_History_Loop:
744744
if scheduleNewDecision {
745745
// We apply the update to execution using optimistic concurrency. If it fails due to a conflict than reload
746746
// the history and try the operation again.
747-
err := t.updateWorkflowExecution(context, msBuilder, scheduleNewDecision, nil, nil)
747+
err := t.updateWorkflowExecution(context, msBuilder, scheduleNewDecision, false, nil, nil)
748748
if err != nil {
749749
if err == ErrConflict {
750750
continue Update_History_Loop
@@ -789,7 +789,7 @@ Update_History_Loop:
789789

790790
// We apply the update to execution using optimistic concurrency. If it fails due to a conflict than reload
791791
// the history and try the operation again.
792-
err := t.updateWorkflowExecution(context, msBuilder, false, nil, nil)
792+
err := t.updateWorkflowExecution(context, msBuilder, false, true, nil, nil)
793793
if err != nil {
794794
if err == ErrConflict {
795795
continue Update_History_Loop
@@ -800,9 +800,14 @@ Update_History_Loop:
800800
return ErrMaxAttemptsExceeded
801801
}
802802

803-
func (t *timerQueueProcessorImpl) updateWorkflowExecution(context *workflowExecutionContext,
804-
msBuilder *mutableStateBuilder, scheduleNewDecision bool, timerTasks []persistence.Task,
805-
clearTimerTask persistence.Task) error {
803+
func (t *timerQueueProcessorImpl) updateWorkflowExecution(
804+
context *workflowExecutionContext,
805+
msBuilder *mutableStateBuilder,
806+
scheduleNewDecision bool,
807+
createDeletionTask bool,
808+
timerTasks []persistence.Task,
809+
clearTimerTask persistence.Task,
810+
) error {
806811
var transferTasks []persistence.Task
807812
if scheduleNewDecision {
808813
// Schedule a new decision.
@@ -814,6 +819,15 @@ func (t *timerQueueProcessorImpl) updateWorkflowExecution(context *workflowExecu
814819
}}
815820
}
816821

822+
if createDeletionTask {
823+
tranT, timerT, err := t.historyService.getDeleteWorkflowTasks(msBuilder.executionInfo.DomainID, context)
824+
if err != nil {
825+
return nil
826+
}
827+
transferTasks = append(transferTasks, tranT)
828+
timerTasks = append(timerTasks, timerT)
829+
}
830+
817831
// Generate a transaction ID for appending events to history
818832
transactionID, err1 := t.historyService.shard.GetNextTransferTaskID()
819833
if err1 != nil {

service/history/timerQueueProcessor2_test.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@ func (s *timerQueueProcessor2Suite) SetupTest() {
8181
s.mockShardManager = &mocks.ShardManager{}
8282
s.mockHistoryMgr = &mocks.HistoryManager{}
8383
s.mockVisibilityMgr = &mocks.VisibilityManager{}
84+
s.mockMetadataMgr = &mocks.MetadataManager{}
8485
s.shardClosedCh = make(chan int, 100)
8586

8687
s.mockShard = &shardContextImpl{
@@ -109,6 +110,7 @@ func (s *timerQueueProcessor2Suite) SetupTest() {
109110
tokenSerializer: common.NewJSONTaskTokenSerializer(),
110111
hSerializerFactory: persistence.NewHistorySerializerFactory(),
111112
metricsClient: s.mockShard.GetMetricsClient(),
113+
domainCache: domainCache,
112114
}
113115
h.timerProcessor = newTimerQueueProcessor(s.mockShard, h, s.mockExecutionMgr, s.logger)
114116
s.mockHistoryEngine = h
@@ -219,6 +221,8 @@ func (s *timerQueueProcessor2Suite) TestWorkflowTimeout() {
219221
wfResponse := &persistence.GetWorkflowExecutionResponse{State: ms}
220222
s.mockExecutionMgr.On("GetWorkflowExecution", mock.Anything).Return(wfResponse, nil).Once()
221223

224+
s.mockMetadataMgr.On("GetDomain", mock.Anything).Return(
225+
&persistence.GetDomainResponse{Config: &persistence.DomainConfig{Retention: 1}}, nil).Once()
222226
s.mockExecutionMgr.On("CompleteTimerTask", mock.Anything).Return(nil).Once()
223227
s.mockHistoryMgr.On("AppendHistoryEvents", mock.Anything).Return(nil).Once()
224228
s.mockExecutionMgr.On("UpdateWorkflowExecution", mock.Anything).Return(nil).Run(func(arguments mock.Arguments) {

0 commit comments

Comments
 (0)