Skip to content

Commit ceeba99

Browse files
author
Tamer Eldeeb
authored
Complete Transfer tasks even after execution is completed (#230)
Issue #229
1 parent b733b1f commit ceeba99

File tree

2 files changed

+43
-0
lines changed

2 files changed

+43
-0
lines changed

service/history/transferQueueProcessor.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -305,6 +305,10 @@ func (t *transferQueueProcessorImpl) processActivityTask(task *persistence.Trans
305305
timeout := int32(0)
306306
if err != nil {
307307
release()
308+
if _, ok := err.(*workflow.EntityNotExistsError); ok {
309+
// this could happen if this is a duplicate processing of the task, and the execution has already completed.
310+
return nil
311+
}
308312
return err
309313
}
310314

@@ -339,6 +343,10 @@ func (t *transferQueueProcessorImpl) processDecisionTask(task *persistence.Trans
339343
}
340344

341345
if err != nil {
346+
if _, ok := err.(*workflow.EntityNotExistsError); ok {
347+
// this could happen if this is a duplicate processing of the task, and the execution has already completed.
348+
return nil
349+
}
342350
return err
343351
}
344352

@@ -454,6 +462,10 @@ func (t *transferQueueProcessorImpl) processCancelExecution(task *persistence.Tr
454462
// Load workflow execution.
455463
_, err = context.loadWorkflowExecution()
456464
if err != nil {
465+
if _, ok := err.(*workflow.EntityNotExistsError); ok {
466+
// this could happen if this is a duplicate processing of the task, and the execution has already completed.
467+
return nil
468+
}
457469
return err
458470
}
459471

@@ -500,6 +512,10 @@ func (t *transferQueueProcessorImpl) processStartChildExecution(task *persistenc
500512
var msBuilder *mutableStateBuilder
501513
msBuilder, err = context.loadWorkflowExecution()
502514
if err != nil {
515+
if _, ok := err.(*workflow.EntityNotExistsError); ok {
516+
// this could happen if this is a duplicate processing of the task, and the execution has already completed.
517+
return nil
518+
}
503519
return err
504520
}
505521

service/history/transferQueueProcessor_test.go

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -386,6 +386,33 @@ workerPump:
386386
s.mockHistoryClient.AssertExpectations(s.T())
387387
}
388388

389+
func (s *transferQueueProcessorSuite) TestCompleteTaskAfterExecutionDeleted() {
390+
domainID := "b677a307-8261-40ea-b239-ab2ec78e443b"
391+
workflowExecution := workflow.WorkflowExecution{WorkflowId: common.StringPtr("complete-task-execution-deleted-test"),
392+
RunId: common.StringPtr("0d00698f-08e1-4d36-a3e2-3bf109f5d2d6")}
393+
taskList := "complete-task-execution-deleted-queue"
394+
task0, err0 := s.CreateWorkflowExecution(domainID, workflowExecution, taskList, "wType", 10, nil, 3, 0, 2, nil)
395+
s.Nil(err0, "No error expected.")
396+
s.NotEmpty(task0, "Expected non empty task identifier.")
397+
398+
tasksCh := make(chan *persistence.TransferTaskInfo, 10)
399+
s.processor.processTransferTasks(tasksCh)
400+
workerPump:
401+
for {
402+
select {
403+
case task := <-tasksCh:
404+
if task.ScheduleID == firstEventID+1 {
405+
s.mockVisibilityMgr.On("RecordWorkflowExecutionStarted", mock.Anything).Once().Return(&workflow.EntityNotExistsError{})
406+
}
407+
s.processor.processTransferTask(task)
408+
default:
409+
break workerPump
410+
}
411+
}
412+
413+
s.mockVisibilityMgr.AssertExpectations(s.T())
414+
}
415+
389416
func createAddRequestFromTask(task *persistence.TransferTaskInfo, scheduleToStartTimeout int32) interface{} {
390417
var res interface{}
391418
domainID := task.DomainID

0 commit comments

Comments
 (0)