Skip to content

Commit 6364be5

Browse files
authored
Consolidate and minor fix on retry behavior for GetHistoryTasks operation (#6957)
1 parent d8e2690 commit 6364be5

File tree

4 files changed

+20
-14
lines changed

4 files changed

+20
-14
lines changed

service/history/queue/timer_queue_processor_base.go

Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ var (
4848
time.Unix(0, math.MaxInt64),
4949
0,
5050
)
51+
timerTaskOperationRetryPolicy = common.CreatePersistenceRetryPolicy()
5152
)
5253

5354
type (
@@ -518,19 +519,24 @@ func (t *timerQueueProcessorBase) getTimerTasks(readLevel, maxReadLevel task.Key
518519
PageSize: batchSize,
519520
NextPageToken: nextPageToken,
520521
}
521-
var err error
522+
522523
var response *persistence.GetHistoryTasksResponse
523-
retryCount := t.shard.GetConfig().TimerProcessorGetFailureRetryCount()
524-
for attempt := 0; attempt < retryCount; attempt++ {
525-
response, err = t.shard.GetExecutionManager().GetHistoryTasks(context.Background(), request)
526-
if err == nil {
527-
return response, nil
528-
}
529-
backoff := time.Duration(attempt*100) * time.Millisecond
530-
t.logger.Debugf("Failed to get timer tasks from execution manager. error: %v, attempt: %d, retryCount: %d, backoff: %v", err, attempt, retryCount, backoff)
531-
time.Sleep(backoff)
524+
op := func(ctx context.Context) error {
525+
var err error
526+
response, err = t.shard.GetExecutionManager().GetHistoryTasks(ctx, request)
527+
return err
528+
}
529+
530+
throttleRetry := backoff.NewThrottleRetry(
531+
backoff.WithRetryPolicy(timerTaskOperationRetryPolicy),
532+
backoff.WithRetryableError(func(err error) bool { return true }),
533+
)
534+
err := throttleRetry.Do(context.Background(), op)
535+
if err != nil {
536+
return nil, err
532537
}
533-
return nil, err
538+
539+
return response, nil
534540
}
535541

536542
func (t *timerQueueProcessorBase) isProcessNow(expiryTime time.Time) bool {

service/history/queue/timer_queue_processor_base_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -495,7 +495,7 @@ func (s *timerQueueProcessorBaseSuite) TestReadAndFilterTasks_LookAheadFailed_No
495495

496496
mockExecutionMgr := s.mockShard.Resource.ExecutionMgr
497497
mockExecutionMgr.On("GetHistoryTasks", mock.Anything, request).Return(response, nil).Once()
498-
mockExecutionMgr.On("GetHistoryTasks", mock.Anything, lookAheadRequest).Return(nil, errors.New("some random error")).Times(s.mockShard.GetConfig().TimerProcessorGetFailureRetryCount())
498+
mockExecutionMgr.On("GetHistoryTasks", mock.Anything, lookAheadRequest).Return(nil, errors.New("some random error")).Times(0)
499499

500500
timerQueueProcessBase, done := s.newTestTimerQueueProcessorBase(nil, nil, nil, nil, nil)
501501
defer done()

service/history/queue/transfer_queue_processor_base.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -528,7 +528,7 @@ func (t *transferQueueProcessorBase) readTasks(
528528
var response *persistence.GetHistoryTasksResponse
529529
op := func(ctx context.Context) error {
530530
var err error
531-
response, err = t.shard.GetExecutionManager().GetHistoryTasks(context.Background(), &persistence.GetHistoryTasksRequest{
531+
response, err = t.shard.GetExecutionManager().GetHistoryTasks(ctx, &persistence.GetHistoryTasksRequest{
532532
TaskCategory: persistence.HistoryTaskCategoryTransfer,
533533
InclusiveMinTaskKey: persistence.NewImmediateTaskKey(readLevel.(transferTaskKey).taskID + 1),
534534
ExclusiveMaxTaskKey: persistence.NewImmediateTaskKey(maxReadLevel.(transferTaskKey).taskID + 1),

service/history/queue/transfer_queue_processor_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -208,7 +208,7 @@ func TestTransferQueueProcessor_FailoverDomain(t *testing.T) {
208208
},
209209
},
210210
}
211-
mockShard.GetExecutionManager().(*mocks.ExecutionManager).On("GetHistoryTasks", context.Background(), mock.Anything).Return(response, nil).Once()
211+
mockShard.GetExecutionManager().(*mocks.ExecutionManager).On("GetHistoryTasks", mock.Anything, mock.Anything).Return(response, nil).Once()
212212
},
213213
processorStarted: true,
214214
},

0 commit comments

Comments
 (0)