Skip to content

Commit fd9a01a

Browse files
authored
Adjust scan intervals for transfer queue processor (#356)
1 parent 27d483c commit fd9a01a

File tree

3 files changed

+32
-6
lines changed

3 files changed

+32
-6
lines changed

service/history/service.go

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -53,13 +53,15 @@ type Config struct {
5353
TimerProcessorUpdateFailureRetryCount int
5454
TimerProcessorGetFailureRetryCount int
5555
TimerProcessorUpdateAckInterval time.Duration
56+
TimerProcessorForceUpdateInterval time.Duration
5657

5758
// TransferQueueProcessor settings
58-
TransferTaskBatchSize int
59-
TransferProcessorMaxPollRPS int
60-
TransferProcessorMaxPollInterval time.Duration
61-
TransferProcessorUpdateAckInterval time.Duration
62-
TransferTaskWorkerCount int
59+
TransferTaskBatchSize int
60+
TransferProcessorMaxPollRPS int
61+
TransferProcessorMaxPollInterval time.Duration
62+
TransferProcessorUpdateAckInterval time.Duration
63+
TransferProcessorForceUpdateInterval time.Duration
64+
TransferTaskWorkerCount int
6365
}
6466

6567
// NewConfig returns new service config with default values
@@ -79,10 +81,12 @@ func NewConfig(numberOfShards int) *Config {
7981
TimerProcessorUpdateFailureRetryCount: 5,
8082
TimerProcessorGetFailureRetryCount: 5,
8183
TimerProcessorUpdateAckInterval: 10 * time.Second,
84+
TimerProcessorForceUpdateInterval: 10 * time.Minute,
8285
TransferTaskBatchSize: 10,
8386
TransferProcessorMaxPollRPS: 100,
84-
TransferProcessorMaxPollInterval: 10 * time.Second,
87+
TransferProcessorMaxPollInterval: 60 * time.Second,
8588
TransferProcessorUpdateAckInterval: 10 * time.Second,
89+
TransferProcessorForceUpdateInterval: 10 * time.Minute,
8690
TransferTaskWorkerCount: 10,
8791
}
8892
}

service/history/timerQueueProcessor.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,8 @@ type (
8181
readLevel SequenceID
8282
ackLevel time.Time
8383
metricsClient metrics.Client
84+
lastUpdated time.Time
85+
config *Config
8486
}
8587
)
8688

@@ -890,6 +892,7 @@ func (t timerTaskIDs) Less(i, j int) bool {
890892
func newTimerAckMgr(processor *timerQueueProcessorImpl, shard ShardContext, executionMgr persistence.ExecutionManager,
891893
logger bark.Logger) *timerAckMgr {
892894
ackLevel := shard.GetTimerAckLevel()
895+
config := shard.GetConfig()
893896
return &timerAckMgr{
894897
processor: processor,
895898
shard: shard,
@@ -899,6 +902,8 @@ func newTimerAckMgr(processor *timerQueueProcessorImpl, shard ShardContext, exec
899902
ackLevel: ackLevel,
900903
metricsClient: processor.metricsClient,
901904
logger: logger,
905+
lastUpdated: time.Now(),
906+
config: config,
902907
}
903908
}
904909

@@ -957,6 +962,7 @@ func (t *timerAckMgr) completeTimerTask(taskID SequenceID) {
957962

958963
func (t *timerAckMgr) updateAckLevel() {
959964
t.metricsClient.IncCounter(metrics.TimerQueueProcessorScope, metrics.AckLevelUpdateCounter)
965+
initialAckLevel := t.ackLevel
960966
updatedAckLevel := t.ackLevel
961967
t.Lock()
962968

@@ -982,6 +988,11 @@ MoveAckLevelLoop:
982988
}
983989
t.Unlock()
984990

991+
// Do not update Acklevel if nothing changed upto force update interval
992+
if initialAckLevel == updatedAckLevel && time.Since(t.lastUpdated) > t.config.TimerProcessorForceUpdateInterval {
993+
return
994+
}
995+
985996
t.logger.Debugf("Updating timer ack level: %v", updatedAckLevel)
986997

987998
// Always update ackLevel to detect if the shared is stolen

service/history/transferQueueProcessor.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,8 @@ type (
7373
executionMgr persistence.ExecutionManager
7474
logger bark.Logger
7575
metricsClient metrics.Client
76+
lastUpdated time.Time
77+
config *Config
7678

7779
sync.RWMutex
7880
outstandingTasks map[int64]bool
@@ -112,6 +114,7 @@ func newTransferQueueProcessor(shard ShardContext, visibilityMgr persistence.Vis
112114
func newAckManager(processor *transferQueueProcessorImpl, shard ShardContext, executionMgr persistence.ExecutionManager,
113115
logger bark.Logger, metricsClient metrics.Client) *ackManager {
114116
ackLevel := shard.GetTransferAckLevel()
117+
config := shard.GetConfig()
115118
return &ackManager{
116119
processor: processor,
117120
shard: shard,
@@ -121,6 +124,8 @@ func newAckManager(processor *transferQueueProcessorImpl, shard ShardContext, ex
121124
ackLevel: ackLevel,
122125
logger: logger,
123126
metricsClient: metricsClient,
127+
lastUpdated: time.Now(),
128+
config: config,
124129
}
125130
}
126131

@@ -912,6 +917,7 @@ func (a *ackManager) completeTask(taskID int64) {
912917

913918
func (a *ackManager) updateAckLevel() {
914919
a.metricsClient.IncCounter(metrics.TransferQueueProcessorScope, metrics.AckLevelUpdateCounter)
920+
initialAckLevel := a.ackLevel
915921
updatedAckLevel := a.ackLevel
916922
a.Lock()
917923
MoveAckLevelLoop:
@@ -935,6 +941,11 @@ MoveAckLevelLoop:
935941
}
936942
a.Unlock()
937943

944+
// Do not update Acklevel if nothing changed upto force update interval
945+
if initialAckLevel == updatedAckLevel && time.Since(a.lastUpdated) > a.config.TransferProcessorForceUpdateInterval {
946+
return
947+
}
948+
938949
// Always update ackLevel to detect if the shared is stolen
939950
if err := a.shard.UpdateTransferAckLevel(updatedAckLevel); err != nil {
940951
a.metricsClient.IncCounter(metrics.TransferQueueProcessorScope, metrics.AckLevelUpdateFailedCounter)

0 commit comments

Comments
 (0)