Skip to content

Commit 2f42f12

Browse files
committed
Update redispatcher to respect task redispatch time
1 parent 8ce4f09 commit 2f42f12

File tree

9 files changed

+86
-127
lines changed

9 files changed

+86
-127
lines changed

common/dynamicconfig/constants.go

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2106,11 +2106,6 @@ const (
21062106

21072107
// key for history
21082108

2109-
// TaskRedispatchIntervalJitterCoefficient is the task redispatch interval jitter coefficient
2110-
// KeyName: history.taskRedispatchIntervalJitterCoefficient
2111-
// Value type: Float64
2112-
// Default value: 0.15
2113-
// Allowed filters: N/A
21142109
TaskRedispatchIntervalJitterCoefficient
21152110
// QueueProcessorRandomSplitProbability is the probability for a domain to be split to a new processing queue
21162111
// KeyName: history.queueProcessorRandomSplitProbability
@@ -4432,7 +4427,7 @@ var FloatKeys = map[FloatKey]DynamicFloat{
44324427
},
44334428
TaskRedispatchIntervalJitterCoefficient: {
44344429
KeyName: "history.taskRedispatchIntervalJitterCoefficient",
4435-
Description: "TaskRedispatchIntervalJitterCoefficient is the task redispatch interval jitter coefficient",
4430+
Description: "Deprecated",
44364431
DefaultValue: 0.15,
44374432
},
44384433
QueueProcessorRandomSplitProbability: {

service/history/config/config.go

Lines changed: 28 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -89,21 +89,20 @@ type Config struct {
8989
StandbyTaskMissingEventsDiscardDelay dynamicconfig.DurationPropertyFn
9090

9191
// Task process settings
92-
TaskProcessRPS dynamicconfig.IntPropertyFnWithDomainFilter
93-
TaskSchedulerType dynamicconfig.IntPropertyFn
94-
TaskSchedulerWorkerCount dynamicconfig.IntPropertyFn
95-
TaskSchedulerShardWorkerCount dynamicconfig.IntPropertyFn
96-
TaskSchedulerQueueSize dynamicconfig.IntPropertyFn
97-
TaskSchedulerShardQueueSize dynamicconfig.IntPropertyFn
98-
TaskSchedulerDispatcherCount dynamicconfig.IntPropertyFn
99-
TaskSchedulerRoundRobinWeights dynamicconfig.MapPropertyFn
100-
TaskCriticalRetryCount dynamicconfig.IntPropertyFn
101-
ActiveTaskRedispatchInterval dynamicconfig.DurationPropertyFn
102-
StandbyTaskRedispatchInterval dynamicconfig.DurationPropertyFn
103-
TaskRedispatchIntervalJitterCoefficient dynamicconfig.FloatPropertyFn
104-
StandbyTaskReReplicationContextTimeout dynamicconfig.DurationPropertyFnWithDomainIDFilter
105-
EnableDropStuckTaskByDomainID dynamicconfig.BoolPropertyFnWithDomainIDFilter
106-
ResurrectionCheckMinDelay dynamicconfig.DurationPropertyFnWithDomainFilter
92+
TaskProcessRPS dynamicconfig.IntPropertyFnWithDomainFilter
93+
TaskSchedulerType dynamicconfig.IntPropertyFn
94+
TaskSchedulerWorkerCount dynamicconfig.IntPropertyFn
95+
TaskSchedulerShardWorkerCount dynamicconfig.IntPropertyFn
96+
TaskSchedulerQueueSize dynamicconfig.IntPropertyFn
97+
TaskSchedulerShardQueueSize dynamicconfig.IntPropertyFn
98+
TaskSchedulerDispatcherCount dynamicconfig.IntPropertyFn
99+
TaskSchedulerRoundRobinWeights dynamicconfig.MapPropertyFn
100+
TaskCriticalRetryCount dynamicconfig.IntPropertyFn
101+
ActiveTaskRedispatchInterval dynamicconfig.DurationPropertyFn
102+
StandbyTaskRedispatchInterval dynamicconfig.DurationPropertyFn
103+
StandbyTaskReReplicationContextTimeout dynamicconfig.DurationPropertyFnWithDomainIDFilter
104+
EnableDropStuckTaskByDomainID dynamicconfig.BoolPropertyFnWithDomainIDFilter
105+
ResurrectionCheckMinDelay dynamicconfig.DurationPropertyFnWithDomainFilter
107106

108107
// QueueProcessor settings
109108
QueueProcessorEnableSplit dynamicconfig.BoolPropertyFn
@@ -368,21 +367,20 @@ func New(dc *dynamicconfig.Collection, numberOfShards int, maxMessageSize int, i
368367
DeleteHistoryEventContextTimeout: dc.GetIntProperty(dynamicconfig.DeleteHistoryEventContextTimeout),
369368
MaxResponseSize: maxMessageSize,
370369

371-
TaskProcessRPS: dc.GetIntPropertyFilteredByDomain(dynamicconfig.TaskProcessRPS),
372-
TaskSchedulerType: dc.GetIntProperty(dynamicconfig.TaskSchedulerType),
373-
TaskSchedulerWorkerCount: dc.GetIntProperty(dynamicconfig.TaskSchedulerWorkerCount),
374-
TaskSchedulerShardWorkerCount: dc.GetIntProperty(dynamicconfig.TaskSchedulerShardWorkerCount),
375-
TaskSchedulerQueueSize: dc.GetIntProperty(dynamicconfig.TaskSchedulerQueueSize),
376-
TaskSchedulerShardQueueSize: dc.GetIntProperty(dynamicconfig.TaskSchedulerShardQueueSize),
377-
TaskSchedulerDispatcherCount: dc.GetIntProperty(dynamicconfig.TaskSchedulerDispatcherCount),
378-
TaskSchedulerRoundRobinWeights: dc.GetMapProperty(dynamicconfig.TaskSchedulerRoundRobinWeights),
379-
TaskCriticalRetryCount: dc.GetIntProperty(dynamicconfig.TaskCriticalRetryCount),
380-
ActiveTaskRedispatchInterval: dc.GetDurationProperty(dynamicconfig.ActiveTaskRedispatchInterval),
381-
StandbyTaskRedispatchInterval: dc.GetDurationProperty(dynamicconfig.StandbyTaskRedispatchInterval),
382-
TaskRedispatchIntervalJitterCoefficient: dc.GetFloat64Property(dynamicconfig.TaskRedispatchIntervalJitterCoefficient),
383-
StandbyTaskReReplicationContextTimeout: dc.GetDurationPropertyFilteredByDomainID(dynamicconfig.StandbyTaskReReplicationContextTimeout),
384-
EnableDropStuckTaskByDomainID: dc.GetBoolPropertyFilteredByDomainID(dynamicconfig.EnableDropStuckTaskByDomainID),
385-
ResurrectionCheckMinDelay: dc.GetDurationPropertyFilteredByDomain(dynamicconfig.ResurrectionCheckMinDelay),
370+
TaskProcessRPS: dc.GetIntPropertyFilteredByDomain(dynamicconfig.TaskProcessRPS),
371+
TaskSchedulerType: dc.GetIntProperty(dynamicconfig.TaskSchedulerType),
372+
TaskSchedulerWorkerCount: dc.GetIntProperty(dynamicconfig.TaskSchedulerWorkerCount),
373+
TaskSchedulerShardWorkerCount: dc.GetIntProperty(dynamicconfig.TaskSchedulerShardWorkerCount),
374+
TaskSchedulerQueueSize: dc.GetIntProperty(dynamicconfig.TaskSchedulerQueueSize),
375+
TaskSchedulerShardQueueSize: dc.GetIntProperty(dynamicconfig.TaskSchedulerShardQueueSize),
376+
TaskSchedulerDispatcherCount: dc.GetIntProperty(dynamicconfig.TaskSchedulerDispatcherCount),
377+
TaskSchedulerRoundRobinWeights: dc.GetMapProperty(dynamicconfig.TaskSchedulerRoundRobinWeights),
378+
TaskCriticalRetryCount: dc.GetIntProperty(dynamicconfig.TaskCriticalRetryCount),
379+
ActiveTaskRedispatchInterval: dc.GetDurationProperty(dynamicconfig.ActiveTaskRedispatchInterval),
380+
StandbyTaskRedispatchInterval: dc.GetDurationProperty(dynamicconfig.StandbyTaskRedispatchInterval),
381+
StandbyTaskReReplicationContextTimeout: dc.GetDurationPropertyFilteredByDomainID(dynamicconfig.StandbyTaskReReplicationContextTimeout),
382+
EnableDropStuckTaskByDomainID: dc.GetBoolPropertyFilteredByDomainID(dynamicconfig.EnableDropStuckTaskByDomainID),
383+
ResurrectionCheckMinDelay: dc.GetDurationPropertyFilteredByDomain(dynamicconfig.ResurrectionCheckMinDelay),
386384

387385
QueueProcessorEnableSplit: dc.GetBoolProperty(dynamicconfig.QueueProcessorEnableSplit),
388386
QueueProcessorSplitMaxLevel: dc.GetIntProperty(dynamicconfig.QueueProcessorSplitMaxLevel),

service/history/config/config_test.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,6 @@ func TestNewConfig(t *testing.T) {
103103
"TaskCriticalRetryCount": {dynamicconfig.TaskCriticalRetryCount, 37},
104104
"ActiveTaskRedispatchInterval": {dynamicconfig.ActiveTaskRedispatchInterval, time.Second},
105105
"StandbyTaskRedispatchInterval": {dynamicconfig.StandbyTaskRedispatchInterval, time.Second},
106-
"TaskRedispatchIntervalJitterCoefficient": {dynamicconfig.TaskRedispatchIntervalJitterCoefficient, 1.0},
107106
"StandbyTaskReReplicationContextTimeout": {dynamicconfig.StandbyTaskReReplicationContextTimeout, time.Second},
108107
"EnableDropStuckTaskByDomainID": {dynamicconfig.EnableDropStuckTaskByDomainID, true},
109108
"ResurrectionCheckMinDelay": {dynamicconfig.ResurrectionCheckMinDelay, time.Second},

service/history/queue/processor_base.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -102,8 +102,7 @@ func newProcessorBase(
102102
taskProcessor,
103103
shard.GetTimeSource(),
104104
&task.RedispatcherOptions{
105-
TaskRedispatchInterval: options.RedispatchInterval,
106-
TaskRedispatchIntervalJitterCoefficient: options.RedispatchIntervalJitterCoefficient,
105+
TaskRedispatchInterval: options.RedispatchInterval,
107106
},
108107
logger,
109108
metricsScope,

service/history/queue/processor_options.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@ type queueProcessorOptions struct {
3131
UpdateAckInterval dynamicconfig.DurationPropertyFn
3232
UpdateAckIntervalJitterCoefficient dynamicconfig.FloatPropertyFn
3333
RedispatchInterval dynamicconfig.DurationPropertyFn
34-
RedispatchIntervalJitterCoefficient dynamicconfig.FloatPropertyFn
3534
MaxRedispatchQueueSize dynamicconfig.IntPropertyFn
3635
MaxStartJitterInterval dynamicconfig.DurationPropertyFn
3736
SplitQueueInterval dynamicconfig.DurationPropertyFn

service/history/queue/timer_queue_processor_base.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -672,7 +672,6 @@ func newTimerQueueProcessorOptions(
672672
MaxPollIntervalJitterCoefficient: config.TimerProcessorMaxPollIntervalJitterCoefficient,
673673
UpdateAckInterval: config.TimerProcessorUpdateAckInterval,
674674
UpdateAckIntervalJitterCoefficient: config.TimerProcessorUpdateAckIntervalJitterCoefficient,
675-
RedispatchIntervalJitterCoefficient: config.TaskRedispatchIntervalJitterCoefficient,
676675
MaxRedispatchQueueSize: config.TimerProcessorMaxRedispatchQueueSize,
677676
SplitQueueInterval: config.TimerProcessorSplitQueueInterval,
678677
SplitQueueIntervalJitterCoefficient: config.TimerProcessorSplitQueueIntervalJitterCoefficient,

service/history/queue/transfer_queue_processor_base.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -573,7 +573,6 @@ func newTransferQueueProcessorOptions(
573573
MaxPollIntervalJitterCoefficient: config.TransferProcessorMaxPollIntervalJitterCoefficient,
574574
UpdateAckInterval: config.TransferProcessorUpdateAckInterval,
575575
UpdateAckIntervalJitterCoefficient: config.TransferProcessorUpdateAckIntervalJitterCoefficient,
576-
RedispatchIntervalJitterCoefficient: config.TaskRedispatchIntervalJitterCoefficient,
577576
MaxRedispatchQueueSize: config.TransferProcessorMaxRedispatchQueueSize,
578577
SplitQueueInterval: config.TransferProcessorSplitQueueInterval,
579578
SplitQueueIntervalJitterCoefficient: config.TransferProcessorSplitQueueIntervalJitterCoefficient,

service/history/task/redispatcher.go

Lines changed: 23 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -50,27 +50,24 @@ type (
5050

5151
// RedispatcherOptions configs redispatch interval
5252
RedispatcherOptions struct {
53-
TaskRedispatchInterval dynamicconfig.DurationPropertyFn
54-
TaskRedispatchIntervalJitterCoefficient dynamicconfig.FloatPropertyFn
53+
TaskRedispatchInterval dynamicconfig.DurationPropertyFn
5554
}
56-
5755
redispatcherImpl struct {
5856
sync.Mutex
5957

6058
taskProcessor Processor
6159
timeSource clock.TimeSource
62-
options *RedispatcherOptions
6360
logger log.Logger
6461
metricsScope metrics.Scope
6562

66-
status int32
67-
shutdownCh chan struct{}
68-
shutdownWG sync.WaitGroup
69-
redispatchCh chan redispatchNotification
70-
redispatchTimer *time.Timer
71-
backoffPolicy backoff.RetryPolicy
72-
pqMap map[int]collection.Queue[redispatchTask] // priority -> redispatch queue
73-
taskChFull map[int]bool // priority -> if taskCh is full
63+
status int32
64+
shutdownCh chan struct{}
65+
shutdownWG sync.WaitGroup
66+
redispatchCh chan redispatchNotification
67+
timerGate clock.TimerGate
68+
backoffPolicy backoff.RetryPolicy
69+
pqMap map[int]collection.Queue[redispatchTask] // priority -> redispatch queue
70+
taskChFull map[int]bool // priority -> if taskCh is full
7471
}
7572

7673
redispatchTask struct {
@@ -95,12 +92,12 @@ func NewRedispatcher(
9592
return &redispatcherImpl{
9693
taskProcessor: taskProcessor,
9794
timeSource: timeSource,
98-
options: options,
9995
logger: logger,
10096
metricsScope: metricsScope,
10197
status: common.DaemonStatusInitialized,
10298
shutdownCh: make(chan struct{}),
10399
redispatchCh: make(chan redispatchNotification, 1),
100+
timerGate: clock.NewTimerGate(timeSource),
104101
backoffPolicy: backoffPolicy,
105102
pqMap: make(map[int]collection.Queue[redispatchTask]),
106103
taskChFull: make(map[int]bool),
@@ -124,14 +121,7 @@ func (r *redispatcherImpl) Stop() {
124121
}
125122

126123
close(r.shutdownCh)
127-
128-
r.Lock()
129-
if r.redispatchTimer != nil {
130-
r.redispatchTimer.Stop()
131-
}
132-
r.redispatchTimer = nil
133-
r.Unlock()
134-
124+
r.timerGate.Stop()
135125
if success := common.AwaitWaitGroup(&r.shutdownWG, time.Minute); !success {
136126
r.logger.Warn("Task redispatcher timedout on shutdown.", tag.LifeCycleStopTimedout)
137127
}
@@ -144,16 +134,18 @@ func (r *redispatcherImpl) AddTask(task Task) {
144134
attempt := task.GetAttempt()
145135

146136
r.Lock()
147-
defer r.Unlock()
148137
pq := r.getOrCreatePQLocked(priority)
138+
t := r.getRedispatchTime(attempt)
149139
pq.Add(redispatchTask{
150140
task: task,
151-
redispatchTime: r.getRedispatchTime(attempt),
141+
redispatchTime: t,
152142
})
143+
r.Unlock()
153144

154-
r.setupTimerLocked()
145+
r.timerGate.Update(t)
155146
}
156147

148+
// TODO: review this method, it doesn't seem to redispatch the tasks immediately
157149
func (r *redispatcherImpl) Redispatch(targetSize int) {
158150
doneCh := make(chan struct{})
159151
ntf := redispatchNotification{
@@ -185,6 +177,8 @@ func (r *redispatcherImpl) redispatchLoop() {
185177
select {
186178
case <-r.shutdownCh:
187179
return
180+
case <-r.timerGate.Chan():
181+
r.redispatchTasks(redispatchNotification{})
188182
case notification := <-r.redispatchCh:
189183
r.redispatchTasks(notification)
190184
}
@@ -199,10 +193,6 @@ func (r *redispatcherImpl) redispatchTasks(notification redispatchNotification)
199193
if notification.doneCh != nil {
200194
close(notification.doneCh)
201195
}
202-
if r.sizeLocked() > 0 {
203-
// there are still tasks left in the queue, setup a redispatch timer for those tasks
204-
r.setupTimerLocked()
205-
}
206196
}()
207197

208198
if r.isStopped() {
@@ -261,44 +251,21 @@ func (r *redispatcherImpl) redispatchTasks(notification redispatchNotification)
261251
totalRedispatched++
262252
}
263253
}
254+
if !pq.IsEmpty() {
255+
item, _ := pq.Peek()
256+
r.timerGate.Update(item.redispatchTime)
257+
}
264258
if r.isStopped() {
265259
return
266260
}
267261
}
268262
}
269263

270-
func (r *redispatcherImpl) setupTimerLocked() {
271-
if r.redispatchTimer != nil || r.isStopped() {
272-
return
273-
}
274-
275-
r.redispatchTimer = time.AfterFunc(
276-
backoff.JitDuration(
277-
r.options.TaskRedispatchInterval(),
278-
r.options.TaskRedispatchIntervalJitterCoefficient(),
279-
),
280-
func() {
281-
r.Lock()
282-
defer r.Unlock()
283-
r.redispatchTimer = nil
284-
285-
select {
286-
case r.redispatchCh <- redispatchNotification{
287-
targetSize: 0,
288-
doneCh: nil,
289-
}:
290-
default:
291-
}
292-
},
293-
)
294-
}
295-
296264
func (r *redispatcherImpl) sizeLocked() int {
297265
size := 0
298266
for _, queue := range r.pqMap {
299267
size += queue.Len()
300268
}
301-
302269
return size
303270
}
304271

@@ -308,7 +275,7 @@ func (r *redispatcherImpl) isStopped() bool {
308275

309276
func (r *redispatcherImpl) getRedispatchTime(attempt int) time.Time {
310277
// note that elapsedTime (the first parameter) is not relevant when
311-
// the retry policy has not expiration interval
278+
// the retry policy has not expiration intervaly(0, attempt)))
312279
return r.timeSource.Now().Add(r.backoffPolicy.ComputeNextDelay(0, attempt))
313280
}
314281

0 commit comments

Comments
 (0)