Skip to content

Commit 04f123f

Browse files
authored
Break retries for expired tasks if task cannot be completed by task completer if it is not started (#6626)
* Break retries for expired tasks if task cannot be completed by task completer if it is not started
1 parent 8ce4f09 commit 04f123f

File tree

2 files changed

+45
-2
lines changed

2 files changed

+45
-2
lines changed

service/matching/tasklist/task_reader.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -503,9 +503,9 @@ func (tr *taskReader) dispatchSingleTaskFromBuffer(taskInfo *persistence.TaskInf
503503
}
504504

505505
if errors.Is(err, errTaskNotStarted) {
506-
e.EventName = "Dispatch failed on completing task on the passive side because task not started. Will retry dispatch"
506+
e.EventName = "Dispatch failed on completing task on the passive side because task not started. Will retry dispatch if task is not expired"
507507
event.Log(e)
508-
return false, false
508+
return false, tr.isTaskExpired(taskInfo)
509509
}
510510

511511
if errors.Is(err, errWaitTimeNotReachedForEntityNotExists) {

service/matching/tasklist/task_reader_test.go

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ func TestDispatchSingleTaskFromBuffer(t *testing.T) {
5252
testCases := []struct {
5353
name string
5454
allowances func(t *testing.T, reader *taskReader)
55+
ttl int
5556
breakDispatch bool
5657
breakRetries bool
5758
}{
@@ -174,6 +175,47 @@ func TestDispatchSingleTaskFromBuffer(t *testing.T) {
174175
breakDispatch: false,
175176
breakRetries: false,
176177
},
178+
{
179+
name: "Error - task not started and not expired, should retry",
180+
allowances: func(t *testing.T, reader *taskReader) {
181+
reader.getIsolationGroupForTask = func(ctx context.Context, info *persistence.TaskInfo) (string, time.Duration, error) {
182+
return defaultIsolationGroup, -1, nil
183+
}
184+
reader.dispatchTask = func(ctx context.Context, task *InternalTask) error {
185+
return errTaskNotStarted
186+
}
187+
},
188+
ttl: 100,
189+
breakDispatch: false,
190+
breakRetries: false,
191+
},
192+
{
193+
name: "Error - task not started and expired, should not retry",
194+
allowances: func(t *testing.T, reader *taskReader) {
195+
reader.getIsolationGroupForTask = func(ctx context.Context, info *persistence.TaskInfo) (string, time.Duration, error) {
196+
return defaultIsolationGroup, -1, nil
197+
}
198+
reader.dispatchTask = func(ctx context.Context, task *InternalTask) error {
199+
return errTaskNotStarted
200+
}
201+
},
202+
ttl: -2,
203+
breakDispatch: false,
204+
breakRetries: true,
205+
},
206+
{
207+
name: "Error - time not reached to complete task without workflow execution, should retry",
208+
allowances: func(t *testing.T, reader *taskReader) {
209+
reader.getIsolationGroupForTask = func(ctx context.Context, info *persistence.TaskInfo) (string, time.Duration, error) {
210+
return defaultIsolationGroup, -1, nil
211+
}
212+
reader.dispatchTask = func(ctx context.Context, task *InternalTask) error {
213+
return errWaitTimeNotReachedForEntityNotExists
214+
}
215+
},
216+
breakDispatch: false,
217+
breakRetries: false,
218+
},
177219
}
178220
for _, tc := range testCases {
179221
t.Run(tc.name, func(t *testing.T) {
@@ -184,6 +226,7 @@ func TestDispatchSingleTaskFromBuffer(t *testing.T) {
184226
reader := tlm.taskReader
185227
tc.allowances(t, reader)
186228
taskInfo := newTask(timeSource)
229+
taskInfo.Expiry = timeSource.Now().Add(time.Duration(tc.ttl) * time.Second)
187230

188231
breakDispatch, breakRetries := reader.dispatchSingleTaskFromBuffer(taskInfo)
189232
assert.Equal(t, tc.breakDispatch, breakDispatch)

0 commit comments

Comments
 (0)