Skip to content

Commit 9ccd575

Browse files
authored
Transfer Queue Processor can get stuck on StartChildExecution task (#337)
Handle EntityNotExistError in the processing of transfer task for StartChildExecution.
1 parent 37a6a24 commit 9ccd575

File tree

4 files changed

+157
-10
lines changed

4 files changed

+157
-10
lines changed

common/persistence/persistenceTestBase.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -519,6 +519,18 @@ func (s *TestBase) UpdateWorkflowExecutionWithTransferTasks(
519519
})
520520
}
521521

522+
// UpdateWorkflowExecutionForChildExecutionsInitiated is a utility method to update workflow execution
523+
func (s *TestBase) UpdateWorkflowExecutionForChildExecutionsInitiated(
524+
updatedInfo *WorkflowExecutionInfo, condition int64, transferTasks []Task, childInfos []*ChildExecutionInfo) error {
525+
return s.WorkflowMgr.UpdateWorkflowExecution(&UpdateWorkflowExecutionRequest{
526+
ExecutionInfo: updatedInfo,
527+
TransferTasks: transferTasks,
528+
Condition: condition,
529+
UpsertChildExecutionInfos: childInfos,
530+
RangeID: s.ShardInfo.RangeID,
531+
})
532+
}
533+
522534
// UpdateWorkflowExecutionForRequestCancel is a utility method to update workflow execution
523535
func (s *TestBase) UpdateWorkflowExecutionForRequestCancel(
524536
updatedInfo *WorkflowExecutionInfo, condition int64, transferTasks []Task,

service/history/historyEngine_test.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2512,6 +2512,24 @@ func addRequestCancelInitiatedEvent(builder *mutableStateBuilder, decisionComple
25122512
return event
25132513
}
25142514

2515+
func addStartChildWorkflowExecutionInitiatedEvent(builder *mutableStateBuilder, decisionCompletedID int64,
2516+
createRequestID, domain, workflowID, workflowType, tasklist string, input []byte,
2517+
executionStartToCloseTimeout, taskStartToCloseTimeout int32) (*workflow.HistoryEvent,
2518+
*persistence.ChildExecutionInfo) {
2519+
return builder.AddStartChildWorkflowExecutionInitiatedEvent(decisionCompletedID, createRequestID,
2520+
&workflow.StartChildWorkflowExecutionDecisionAttributes{
2521+
Domain: common.StringPtr(domain),
2522+
WorkflowId: common.StringPtr(workflowID),
2523+
WorkflowType: &workflow.WorkflowType{Name: common.StringPtr(workflowType)},
2524+
TaskList: &workflow.TaskList{Name: common.StringPtr(tasklist)},
2525+
Input: input,
2526+
ExecutionStartToCloseTimeoutSeconds: common.Int32Ptr(executionStartToCloseTimeout),
2527+
TaskStartToCloseTimeoutSeconds: common.Int32Ptr(taskStartToCloseTimeout),
2528+
ChildPolicy: workflow.ChildPolicyPtr(workflow.ChildPolicy_TERMINATE),
2529+
Control: nil,
2530+
})
2531+
}
2532+
25152533
func addCompleteWorkflowEvent(builder *mutableStateBuilder, decisionCompletedEventID int64,
25162534
result []byte) *workflow.HistoryEvent {
25172535
e := builder.AddCompletedWorkflowEvent(decisionCompletedEventID, &workflow.CompleteWorkflowExecutionDecisionAttributes{

service/history/transferQueueProcessor.go

Lines changed: 24 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -651,21 +651,15 @@ func (t *transferQueueProcessorImpl) processStartChildExecution(task *persistenc
651651
return err
652652
}
653653
// Finally create first decision task for Child execution so it is really started
654-
err = t.historyClient.ScheduleDecisionTask(nil, &history.ScheduleDecisionTaskRequest{
655-
DomainUUID: common.StringPtr(targetDomainID),
656-
WorkflowExecution: &workflow.WorkflowExecution{
657-
WorkflowId: common.StringPtr(task.TargetWorkflowID),
658-
RunId: common.StringPtr(startResponse.GetRunId()),
659-
},
654+
err = t.createFirstDecisionTask(targetDomainID, &workflow.WorkflowExecution{
655+
WorkflowId: common.StringPtr(task.TargetWorkflowID),
656+
RunId: common.StringPtr(startResponse.GetRunId()),
660657
})
661658
} else {
662659
// ChildExecution already started, just create DecisionTask and complete transfer task
663660
startedEvent, _ := msBuilder.GetChildExecutionStartedEvent(initiatedEventID)
664661
startedAttributes := startedEvent.GetChildWorkflowExecutionStartedEventAttributes()
665-
err = t.historyClient.ScheduleDecisionTask(nil, &history.ScheduleDecisionTaskRequest{
666-
DomainUUID: common.StringPtr(targetDomainID),
667-
WorkflowExecution: startedAttributes.GetWorkflowExecution(),
668-
})
662+
err = t.createFirstDecisionTask(targetDomainID, startedAttributes.GetWorkflowExecution())
669663
}
670664
}
671665

@@ -744,6 +738,26 @@ func (t *transferQueueProcessorImpl) recordStartChildExecutionFailed(task *persi
744738
})
745739
}
746740

741+
// createFirstDecisionTask is used by StartChildExecution transfer task to create the first decision task for
742+
// child execution.
743+
func (t *transferQueueProcessorImpl) createFirstDecisionTask(domainID string,
744+
execution *workflow.WorkflowExecution) error {
745+
err := t.historyClient.ScheduleDecisionTask(nil, &history.ScheduleDecisionTaskRequest{
746+
DomainUUID: common.StringPtr(domainID),
747+
WorkflowExecution: execution,
748+
})
749+
750+
if err != nil {
751+
if _, ok := err.(*workflow.EntityNotExistsError); ok {
752+
// Maybe child workflow execution already timedout or terminated
753+
// Safe to discard the error and complete this transfer task
754+
return nil
755+
}
756+
}
757+
758+
return err
759+
}
760+
747761
func (t *transferQueueProcessorImpl) requestCancelCompleted(task *persistence.TransferTaskInfo,
748762
context *workflowExecutionContext,
749763
request *history.RequestCancelWorkflowExecutionRequest) error {

service/history/transferQueueProcessor_test.go

Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import (
2828
"github.com/stretchr/testify/mock"
2929
"github.com/stretchr/testify/suite"
3030

31+
"github.com/pborman/uuid"
3132
"github.com/uber-common/bark"
3233
m "github.com/uber/cadence/.gen/go/matching"
3334
workflow "github.com/uber/cadence/.gen/go/shared"
@@ -424,6 +425,108 @@ workerPump:
424425
}
425426
}
426427

428+
func (s *transferQueueProcessorSuite) TestStartChildExecutionTransferTasks() {
429+
domain := "start-child-execution-transfer-tasks-test-domain"
430+
domainID := "b7f71853-0a8c-4eb1-af6b-c4e71dae50a1"
431+
workflowID := "start-child-execution-transfertasks-test"
432+
runID := "67ad6d62-79c6-4c8a-a27a-1fb7a10ae789"
433+
workflowExecution := workflow.WorkflowExecution{
434+
WorkflowId: common.StringPtr(workflowID),
435+
RunId: common.StringPtr(runID),
436+
}
437+
taskList := "start-child-execution-transfertasks-queue"
438+
identity := "start-child-execution-transfertasks-test"
439+
440+
tasksCh := s.createChildExecutionState(domain, domainID, workflowExecution, taskList, identity)
441+
childRunID := "d3f9164e-b696-4350-8409-9a6cf670f4f2"
442+
workerPump:
443+
for {
444+
select {
445+
case task := <-tasksCh:
446+
if task.TaskType == persistence.TransferTaskTypeStartChildExecution {
447+
s.mockHistoryClient.On("StartWorkflowExecution", mock.Anything, mock.Anything).Once().Return(
448+
&workflow.StartWorkflowExecutionResponse{
449+
RunId: common.StringPtr(childRunID),
450+
}, nil)
451+
s.mockHistoryClient.On("ScheduleDecisionTask", mock.Anything, mock.Anything).Once().Return(nil)
452+
}
453+
s.processor.processTransferTask(task)
454+
default:
455+
break workerPump
456+
}
457+
}
458+
}
459+
460+
func (s *transferQueueProcessorSuite) TestStartChildExecutionTransferTasksChildCompleted() {
461+
domain := "start-child-execution-transfer-tasks-child-completed-test-domain"
462+
domainID := "8bf7aaf8-810a-4778-a549-d7913d2a5b82"
463+
workflowID := "start-child-execution-transfertasks-child-completed-test"
464+
runID := "a627ea38-7e5e-41cc-9a32-4c7979b93ae2"
465+
workflowExecution := workflow.WorkflowExecution{
466+
WorkflowId: common.StringPtr(workflowID),
467+
RunId: common.StringPtr(runID),
468+
}
469+
taskList := "start-child-execution-transfertasks-child-completed-queue"
470+
identity := "start-child-execution-transfertasks-child-completed-test"
471+
472+
tasksCh := s.createChildExecutionState(domain, domainID, workflowExecution, taskList, identity)
473+
childRunID := "66825b60-5ae2-4ff3-8da6-cbb286b4a7e6"
474+
workerPump:
475+
for {
476+
select {
477+
case task := <-tasksCh:
478+
if task.TaskType == persistence.TransferTaskTypeStartChildExecution {
479+
s.mockHistoryClient.On("StartWorkflowExecution", mock.Anything, mock.Anything).Once().Return(
480+
&workflow.StartWorkflowExecutionResponse{
481+
RunId: common.StringPtr(childRunID),
482+
}, nil)
483+
s.mockHistoryClient.On("ScheduleDecisionTask", mock.Anything, mock.Anything).Once().Return(
484+
&workflow.EntityNotExistsError{})
485+
}
486+
s.processor.processTransferTask(task)
487+
default:
488+
break workerPump
489+
}
490+
}
491+
}
492+
493+
func (s *transferQueueProcessorSuite) createChildExecutionState(domain, domainID string,
494+
workflowExecution workflow.WorkflowExecution, taskList, identity string) chan *persistence.TransferTaskInfo {
495+
_, err0 := s.CreateWorkflowExecution(domainID, workflowExecution, taskList, "wType", 10, nil, 3, 0, 2, nil)
496+
s.Nil(err0, "No error expected.")
497+
s.mockMatching.On("AddDecisionTask", mock.Anything, mock.Anything).Once().Return(nil)
498+
s.mockVisibilityMgr.On("RecordWorkflowExecutionStarted", mock.Anything).Once().Return(nil)
499+
500+
builder := newMutableStateBuilder(s.ShardContext.GetConfig(), s.logger)
501+
info1, _ := s.GetWorkflowExecutionInfo(domainID, workflowExecution)
502+
builder.Load(info1)
503+
startedEvent := addDecisionTaskStartedEvent(builder, int64(2), taskList, identity)
504+
completedEvent := addDecisionTaskCompletedEvent(builder, int64(2), startedEvent.GetEventId(), nil, identity)
505+
506+
transferTasks := []persistence.Task{}
507+
createRequestID := uuid.New()
508+
509+
childWorkflowID := "start-child-execution-transfertasks-test-child-workflow-id"
510+
childWorkflowType := "child-workflow-type"
511+
_, ci := addStartChildWorkflowExecutionInitiatedEvent(builder, completedEvent.GetEventId(), createRequestID,
512+
domain, childWorkflowID, childWorkflowType, taskList, nil, int32(100), int32(10))
513+
transferTasks = append(transferTasks, &persistence.StartChildExecutionTask{
514+
TargetDomainID: domainID,
515+
TargetWorkflowID: childWorkflowID,
516+
InitiatedID: ci.InitiatedID,
517+
})
518+
519+
updatedInfo := copyWorkflowExecutionInfo(builder.executionInfo)
520+
err1 := s.UpdateWorkflowExecutionForChildExecutionsInitiated(updatedInfo, int64(3), transferTasks,
521+
builder.updateChildExecutionInfos)
522+
s.Nil(err1, "No error expected.")
523+
524+
tasksCh := make(chan *persistence.TransferTaskInfo, 10)
525+
s.processor.processTransferTasks(tasksCh)
526+
527+
return tasksCh
528+
}
529+
427530
func createAddRequestFromTask(task *persistence.TransferTaskInfo, scheduleToStartTimeout int32) interface{} {
428531
var res interface{}
429532
domainID := task.DomainID

0 commit comments

Comments
 (0)