Skip to content

Commit 02e20f9

Browse files
committed
Unify methods of history tasks in shard component
1 parent 721e13d commit 02e20f9

19 files changed

+391
-500
lines changed

common/persistence/data_manager_interfaces.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -438,6 +438,15 @@ type (
438438
CurrentRunID string
439439
}
440440

441+
// FailoverLevel contains corresponding start / end level
442+
FailoverLevel struct {
443+
StartTime time.Time
444+
MinLevel HistoryTaskKey
445+
CurrentLevel HistoryTaskKey
446+
MaxLevel HistoryTaskKey
447+
DomainIDs map[string]struct{}
448+
}
449+
441450
// TransferTaskInfo describes a transfer task
442451
TransferTaskInfo struct {
443452
DomainID string

service/history/decision/handler_test.go

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -200,7 +200,7 @@ func TestHandleDecisionTaskScheduled(t *testing.T) {
200200
Times(1).
201201
Return(&types.HistoryEvent{}, nil)
202202
shardContext.EXPECT().GetShardID().Return(testShardID).Times(1)
203-
shardContext.EXPECT().GenerateTransferTaskIDs(gomock.Any()).Times(1).Return([]int64{}, errors.New("some random error to avoid going too deep in call stack unrelated to this unit"))
203+
shardContext.EXPECT().GenerateTaskIDs(gomock.Any()).Times(1).Return([]int64{}, errors.New("some random error to avoid going too deep in call stack unrelated to this unit"))
204204
},
205205
expectErr: true,
206206
isfirstDecision: true,
@@ -222,7 +222,7 @@ func TestHandleDecisionTaskScheduled(t *testing.T) {
222222
Times(1).
223223
Return(&types.HistoryEvent{}, nil)
224224
shardContext.EXPECT().GetShardID().Return(testShardID).Times(1)
225-
shardContext.EXPECT().GenerateTransferTaskIDs(gomock.Any()).Times(1).Return([]int64{}, errors.New("some random error to avoid going too deep in call stack unrelated to this unit"))
225+
shardContext.EXPECT().GenerateTaskIDs(gomock.Any()).Times(1).Return([]int64{}, errors.New("some random error to avoid going too deep in call stack unrelated to this unit"))
226226
},
227227
expectErr: true,
228228
isfirstDecision: true,
@@ -301,7 +301,7 @@ func TestHandleDecisionTaskFailed(t *testing.T) {
301301
}
302302
h.tokenSerializer.(*common.MockTaskTokenSerializer).EXPECT().Deserialize(taskToken).Return(token, nil)
303303
h.shard.(*shard.MockContext).EXPECT().GetEventsCache().Times(1).Return(events.NewMockCache(ctrl))
304-
h.shard.(*shard.MockContext).EXPECT().GenerateTransferTaskIDs(gomock.Any()).Return([]int64{0}, nil)
304+
h.shard.(*shard.MockContext).EXPECT().GenerateTaskIDs(gomock.Any()).Return([]int64{0}, nil)
305305
h.shard.(*shard.MockContext).EXPECT().AppendHistoryV2Events(gomock.Any(), gomock.Any(), constants.TestDomainID, types.WorkflowExecution{
306306
WorkflowID: constants.TestWorkflowID,
307307
RunID: constants.TestRunID,
@@ -494,7 +494,7 @@ func TestHandleDecisionTaskStarted(t *testing.T) {
494494
domainID: constants.TestDomainID,
495495
expectCalls: func(ctrl *gomock.Controller, h *handlerImpl) {
496496
h.shard.(*shard.MockContext).EXPECT().GetEventsCache().Times(1).Return(events.NewMockCache(ctrl))
497-
h.shard.(*shard.MockContext).EXPECT().GenerateTransferTaskIDs(gomock.Any()).Times(1).Return([]int64{0}, nil)
497+
h.shard.(*shard.MockContext).EXPECT().GenerateTaskIDs(gomock.Any()).Times(1).Return([]int64{0}, nil)
498498
h.shard.(*shard.MockContext).EXPECT().
499499
AppendHistoryV2Events(gomock.Any(), gomock.Any(), constants.TestDomainID, types.WorkflowExecution{WorkflowID: constants.TestWorkflowID, RunID: constants.TestRunID}).
500500
Return(&persistence.AppendHistoryNodesResponse{}, nil)
@@ -639,8 +639,8 @@ func TestHandleDecisionTaskCompleted(t *testing.T) {
639639
decisionHandler.shard.(*shard.MockContext).EXPECT().GetEventsCache().Times(1).Return(eventsCache)
640640
eventsCache.EXPECT().PutEvent(constants.TestDomainID, constants.TestWorkflowID, constants.TestRunID, int64(1), gomock.Any())
641641
decisionHandler.shard.(*shard.MockContext).EXPECT().GetShardID().Times(1).Return(testShardID)
642-
decisionHandler.shard.(*shard.MockContext).EXPECT().GenerateTransferTaskIDs(4).Return([]int64{0, 1, 2, 3}, nil)
643-
decisionHandler.shard.(*shard.MockContext).EXPECT().GenerateTransferTaskIDs(6).Return([]int64{0, 1, 2, 3, 4, 5}, nil)
642+
decisionHandler.shard.(*shard.MockContext).EXPECT().GenerateTaskIDs(4).Return([]int64{0, 1, 2, 3}, nil)
643+
decisionHandler.shard.(*shard.MockContext).EXPECT().GenerateTaskIDs(6).Return([]int64{0, 1, 2, 3, 4, 5}, nil)
644644
decisionHandler.shard.(*shard.MockContext).EXPECT().AppendHistoryV2Events(gomock.Any(), gomock.Any(), constants.TestDomainID, types.WorkflowExecution{
645645
WorkflowID: constants.TestWorkflowID,
646646
RunID: constants.TestRunID,
@@ -769,7 +769,7 @@ func TestHandleDecisionTaskCompleted(t *testing.T) {
769769
eventsCache := events.NewMockCache(ctrl)
770770
decisionHandler.shard.(*shard.MockContext).EXPECT().GetEventsCache().Times(1).Return(eventsCache)
771771
decisionHandler.shard.(*shard.MockContext).EXPECT().GetShardID().Times(1).Return(testShardID)
772-
decisionHandler.shard.(*shard.MockContext).EXPECT().GenerateTransferTaskIDs(1).Return([]int64{0}, nil)
772+
decisionHandler.shard.(*shard.MockContext).EXPECT().GenerateTaskIDs(1).Return([]int64{0}, nil)
773773
decisionHandler.shard.(*shard.MockContext).EXPECT().AppendHistoryV2Events(gomock.Any(), gomock.Any(), constants.TestDomainID, types.WorkflowExecution{
774774
WorkflowID: constants.TestWorkflowID,
775775
RunID: constants.TestRunID,
@@ -822,7 +822,7 @@ func TestHandleDecisionTaskCompleted(t *testing.T) {
822822
eventsCache.EXPECT().GetEvent(context.Background(), testShardID, constants.TestDomainID, constants.TestWorkflowID, constants.TestRunID, commonconstants.FirstEventID, commonconstants.FirstEventID, nil).Return(&types.HistoryEvent{}, nil)
823823
eventsCache.EXPECT().PutEvent(constants.TestDomainID, constants.TestWorkflowID, gomock.Any(), int64(1), gomock.Any()).Times(2)
824824
decisionHandler.shard.(*shard.MockContext).EXPECT().GetShardID().Times(1).Return(testShardID)
825-
decisionHandler.shard.(*shard.MockContext).EXPECT().GenerateTransferTaskIDs(2).Times(1).Return([]int64{0, 1}, nil)
825+
decisionHandler.shard.(*shard.MockContext).EXPECT().GenerateTaskIDs(2).Times(1).Return([]int64{0, 1}, nil)
826826
decisionHandler.shard.(*shard.MockContext).EXPECT().AppendHistoryV2Events(gomock.Any(), gomock.Any(), constants.TestDomainID, gomock.Any()).Return(nil, &persistence.TransactionSizeLimitError{Msg: fmt.Sprintf("transaction size exceeds limit")})
827827
decisionHandler.shard.(*shard.MockContext).EXPECT().GetExecutionManager().Times(1)
828828
},
@@ -863,7 +863,7 @@ func TestHandleDecisionTaskCompleted(t *testing.T) {
863863
}, nil).Times(3)
864864
eventsCache.EXPECT().PutEvent(constants.TestDomainID, constants.TestWorkflowID, gomock.Any(), int64(1), gomock.Any()).Times(2)
865865
decisionHandler.shard.(*shard.MockContext).EXPECT().GetShardID().Times(3).Return(testShardID)
866-
decisionHandler.shard.(*shard.MockContext).EXPECT().GenerateTransferTaskIDs(2).Times(1).Return([]int64{0, 1}, nil)
866+
decisionHandler.shard.(*shard.MockContext).EXPECT().GenerateTaskIDs(2).Times(1).Return([]int64{0, 1}, nil)
867867
decisionHandler.shard.(*shard.MockContext).EXPECT().AppendHistoryV2Events(gomock.Any(), gomock.Any(), constants.TestDomainID, gomock.Any()).Return(nil, execution.NewConflictError(new(testing.T), errors.New("some random conflict error")))
868868
decisionHandler.shard.(*shard.MockContext).EXPECT().GetExecutionManager().Times(1)
869869
},
@@ -904,7 +904,7 @@ func TestHandleDecisionTaskCompleted(t *testing.T) {
904904
}, nil).Times(3)
905905
eventsCache.EXPECT().PutEvent(constants.TestDomainID, constants.TestWorkflowID, gomock.Any(), int64(1), gomock.Any()).Times(2)
906906
decisionHandler.shard.(*shard.MockContext).EXPECT().GetShardID().Times(3).Return(testShardID)
907-
decisionHandler.shard.(*shard.MockContext).EXPECT().GenerateTransferTaskIDs(2).Times(1).Return([]int64{0, 1}, nil)
907+
decisionHandler.shard.(*shard.MockContext).EXPECT().GenerateTaskIDs(2).Times(1).Return([]int64{0, 1}, nil)
908908
decisionHandler.shard.(*shard.MockContext).EXPECT().AppendHistoryV2Events(gomock.Any(), gomock.Any(), constants.TestDomainID, gomock.Any()).Return(nil, &persistence.TransactionSizeLimitError{Msg: fmt.Sprintf("transaction size exceeds limit")})
909909
decisionHandler.shard.(*shard.MockContext).EXPECT().GetExecutionManager().Times(1)
910910
firstGetWfExecutionCall := decisionHandler.shard.(*shard.MockContext).EXPECT().GetWorkflowExecution(context.Background(), gomock.Any()).
@@ -955,8 +955,8 @@ func TestHandleDecisionTaskCompleted(t *testing.T) {
955955
}, nil).Times(3)
956956
eventsCache.EXPECT().PutEvent(constants.TestDomainID, constants.TestWorkflowID, gomock.Any(), int64(1), gomock.Any()).Times(3)
957957
decisionHandler.shard.(*shard.MockContext).EXPECT().GetShardID().Times(3).Return(testShardID)
958-
decisionHandler.shard.(*shard.MockContext).EXPECT().GenerateTransferTaskIDs(2).Times(2).Return([]int64{0, 1}, nil)
959-
decisionHandler.shard.(*shard.MockContext).EXPECT().GenerateTransferTaskIDs(1).Times(1).Return([]int64{0}, nil)
958+
decisionHandler.shard.(*shard.MockContext).EXPECT().GenerateTaskIDs(2).Times(2).Return([]int64{0, 1}, nil)
959+
decisionHandler.shard.(*shard.MockContext).EXPECT().GenerateTaskIDs(1).Times(1).Return([]int64{0}, nil)
960960
decisionHandler.shard.(*shard.MockContext).EXPECT().AppendHistoryV2Events(gomock.Any(), gomock.Any(), constants.TestDomainID, gomock.Any()).Return(nil, &persistence.TransactionSizeLimitError{Msg: fmt.Sprintf("transaction size exceeds limit")})
961961
decisionHandler.shard.(*shard.MockContext).EXPECT().AppendHistoryV2Events(gomock.Any(), gomock.Any(), constants.TestDomainID, gomock.Any()).Return(&persistence.AppendHistoryNodesResponse{}, nil)
962962
decisionHandler.shard.(*shard.MockContext).EXPECT().GetExecutionManager().Times(1)
@@ -999,7 +999,7 @@ func TestHandleDecisionTaskCompleted(t *testing.T) {
999999
eventsCache := events.NewMockCache(ctrl)
10001000
decisionHandler.shard.(*shard.MockContext).EXPECT().GetEventsCache().Times(3).Return(eventsCache)
10011001
eventsCache.EXPECT().PutEvent(constants.TestDomainID, constants.TestWorkflowID, constants.TestRunID, int64(0), gomock.Any())
1002-
decisionHandler.shard.(*shard.MockContext).EXPECT().GenerateTransferTaskIDs(1).Return([]int64{0}, nil)
1002+
decisionHandler.shard.(*shard.MockContext).EXPECT().GenerateTaskIDs(1).Return([]int64{0}, nil)
10031003
decisionHandler.shard.(*shard.MockContext).EXPECT().AppendHistoryV2Events(gomock.Any(), gomock.Any(), constants.TestDomainID, gomock.Any()).Return(nil, errors.New("some error updating continue as new info"))
10041004
domainEntry := cache.NewLocalDomainCacheEntryForTest(
10051005
&persistence.DomainInfo{ID: constants.TestDomainID, Name: constants.TestDomainName},
@@ -1146,7 +1146,7 @@ func TestHandleDecisionTaskCompleted(t *testing.T) {
11461146
decisionHandler.tokenSerializer.(*common.MockTaskTokenSerializer).EXPECT().Deserialize(serializedTestToken).Return(deserializedTestToken, nil)
11471147
decisionHandler.shard.(*shard.MockContext).EXPECT().GetEventsCache().Times(1).Return(events.NewMockCache(ctrl))
11481148
decisionHandler.shard.(*shard.MockContext).EXPECT().GetShardID().Times(1).Return(testShardID)
1149-
decisionHandler.shard.(*shard.MockContext).EXPECT().GenerateTransferTaskIDs(3).Return([]int64{0, 1, 2}, nil)
1149+
decisionHandler.shard.(*shard.MockContext).EXPECT().GenerateTaskIDs(3).Return([]int64{0, 1, 2}, nil)
11501150
decisionHandler.shard.(*shard.MockContext).EXPECT().AppendHistoryV2Events(gomock.Any(), gomock.Any(), constants.TestDomainID, types.WorkflowExecution{
11511151
WorkflowID: constants.TestWorkflowID,
11521152
RunID: constants.TestRunID,

service/history/execution/mutable_state_builder.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -798,7 +798,7 @@ func (e *mutableStateBuilder) assignTaskIDToEvents() error {
798798
// first transient events
799799
numTaskIDs := len(e.hBuilder.transientHistory)
800800
if numTaskIDs > 0 {
801-
taskIDs, err := e.shard.GenerateTransferTaskIDs(numTaskIDs)
801+
taskIDs, err := e.shard.GenerateTaskIDs(numTaskIDs)
802802
if err != nil {
803803
return err
804804
}
@@ -815,7 +815,7 @@ func (e *mutableStateBuilder) assignTaskIDToEvents() error {
815815
// then normal events
816816
numTaskIDs = len(e.hBuilder.history)
817817
if numTaskIDs > 0 {
818-
taskIDs, err := e.shard.GenerateTransferTaskIDs(numTaskIDs)
818+
taskIDs, err := e.shard.GenerateTaskIDs(numTaskIDs)
819819
if err != nil {
820820
return err
821821
}

service/history/execution/mutable_state_builder_test.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3139,7 +3139,7 @@ func TestAssignTaskIDToTransientHistoryEvents(t *testing.T) {
31393139
},
31403140
taskID: 123,
31413141
shardContextExpectations: func(mockCache *events.MockCache, shardContext *shardCtx.MockContext, mockDomainCache *cache.MockDomainCache) {
3142-
shardContext.EXPECT().GenerateTransferTaskIDs(1).Return([]int64{123}, nil).Times(1)
3142+
shardContext.EXPECT().GenerateTaskIDs(1).Return([]int64{123}, nil).Times(1)
31433143
},
31443144
expectedEvents: []*types.HistoryEvent{
31453145
{
@@ -3164,7 +3164,7 @@ func TestAssignTaskIDToTransientHistoryEvents(t *testing.T) {
31643164
},
31653165
taskID: 456,
31663166
shardContextExpectations: func(mockCache *events.MockCache, shardContext *shardCtx.MockContext, mockDomainCache *cache.MockDomainCache) {
3167-
shardContext.EXPECT().GenerateTransferTaskIDs(2).Return([]int64{123, 124}, nil).Times(1)
3167+
shardContext.EXPECT().GenerateTaskIDs(2).Return([]int64{123, 124}, nil).Times(1)
31683168
},
31693169
expectedEvents: []*types.HistoryEvent{
31703170
{
@@ -3196,7 +3196,7 @@ func TestAssignTaskIDToTransientHistoryEvents(t *testing.T) {
31963196
},
31973197
taskID: 456,
31983198
shardContextExpectations: func(mockCache *events.MockCache, shardContext *shardCtx.MockContext, mockDomainCache *cache.MockDomainCache) {
3199-
shardContext.EXPECT().GenerateTransferTaskIDs(1).Return(nil, assert.AnError).Times(1)
3199+
shardContext.EXPECT().GenerateTaskIDs(1).Return(nil, assert.AnError).Times(1)
32003200
},
32013201
expectedEvents: []*types.HistoryEvent{
32023202
{
@@ -3252,7 +3252,7 @@ func TestAssignTaskIDToHistoryEvents(t *testing.T) {
32523252
},
32533253
taskID: 123,
32543254
shardContextExpectations: func(mockCache *events.MockCache, shardContext *shardCtx.MockContext, mockDomainCache *cache.MockDomainCache) {
3255-
shardContext.EXPECT().GenerateTransferTaskIDs(1).Return([]int64{123}, nil).Times(1)
3255+
shardContext.EXPECT().GenerateTaskIDs(1).Return([]int64{123}, nil).Times(1)
32563256
},
32573257
expectedEvents: []*types.HistoryEvent{
32583258
{
@@ -3277,7 +3277,7 @@ func TestAssignTaskIDToHistoryEvents(t *testing.T) {
32773277
},
32783278
taskID: 456,
32793279
shardContextExpectations: func(mockCache *events.MockCache, shardContext *shardCtx.MockContext, mockDomainCache *cache.MockDomainCache) {
3280-
shardContext.EXPECT().GenerateTransferTaskIDs(2).Return([]int64{123, 124}, nil).Times(1)
3280+
shardContext.EXPECT().GenerateTaskIDs(2).Return([]int64{123, 124}, nil).Times(1)
32813281
},
32823282
expectedEvents: []*types.HistoryEvent{
32833283
{
@@ -3309,7 +3309,7 @@ func TestAssignTaskIDToHistoryEvents(t *testing.T) {
33093309
},
33103310
taskID: 456,
33113311
shardContextExpectations: func(mockCache *events.MockCache, shardContext *shardCtx.MockContext, mockDomainCache *cache.MockDomainCache) {
3312-
shardContext.EXPECT().GenerateTransferTaskIDs(1).Return(nil, assert.AnError).Times(1)
3312+
shardContext.EXPECT().GenerateTaskIDs(1).Return(nil, assert.AnError).Times(1)
33133313
},
33143314
expectedEvents: []*types.HistoryEvent{
33153315
{
@@ -3535,7 +3535,7 @@ func TestCloseTransactionAsMutation(t *testing.T) {
35353535
MaximumBufferedEventsBatch: func(...dynamicproperties.FilterOption) int { return 100 },
35363536
}).Times(3)
35373537

3538-
shardContext.EXPECT().GenerateTransferTaskIDs(1).Return([]int64{123}, nil).Times(1)
3538+
shardContext.EXPECT().GenerateTaskIDs(1).Return([]int64{123}, nil).Times(1)
35393539
shardContext.EXPECT().GetDomainCache().Return(mockDomainCache).Times(1)
35403540
mockDomainCache.EXPECT().GetDomainByID("some-domain-id").Return(mockDomain, nil)
35413541

service/history/queue/timer_queue_active_processor.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,9 @@ func newTimerQueueActiveProcessor(
6666
}
6767

6868
updateClusterAckLevel := func(ackLevel task.Key) error {
69-
return shard.UpdateTimerClusterAckLevel(clusterName, ackLevel.(timerTaskKey).visibilityTimestamp)
69+
return shard.UpdateQueueClusterAckLevel(persistence.HistoryTaskCategoryTimer, clusterName, persistence.HistoryTaskKey{
70+
ScheduledTime: ackLevel.(timerTaskKey).visibilityTimestamp,
71+
})
7072
}
7173

7274
updateProcessingQueueStates := func(states []ProcessingQueueState) error {

service/history/queue/timer_queue_failover_processor.go

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -77,20 +77,30 @@ func newTimerQueueFailoverProcessor(
7777
}
7878

7979
updateClusterAckLevel := func(ackLevel task.Key) error {
80-
return shardContext.UpdateTimerFailoverLevel(
80+
return shardContext.UpdateFailoverLevel(
81+
persistence.HistoryTaskCategoryTimer,
8182
failoverUUID,
82-
shard.TimerFailoverLevel{
83-
StartTime: failoverStartTime,
84-
MinLevel: minLevel,
85-
CurrentLevel: ackLevel.(timerTaskKey).visibilityTimestamp,
86-
MaxLevel: maxLevel,
87-
DomainIDs: domainIDs,
83+
persistence.FailoverLevel{
84+
StartTime: failoverStartTime,
85+
MinLevel: persistence.HistoryTaskKey{
86+
ScheduledTime: minLevel,
87+
},
88+
CurrentLevel: persistence.HistoryTaskKey{
89+
ScheduledTime: ackLevel.(timerTaskKey).visibilityTimestamp,
90+
},
91+
MaxLevel: persistence.HistoryTaskKey{
92+
ScheduledTime: maxLevel,
93+
},
94+
DomainIDs: domainIDs,
8895
},
8996
)
9097
}
9198

9299
queueShutdown := func() error {
93-
return shardContext.DeleteTimerFailoverLevel(failoverUUID)
100+
return shardContext.DeleteFailoverLevel(
101+
persistence.HistoryTaskCategoryTimer,
102+
failoverUUID,
103+
)
94104
}
95105

96106
processingQueueStates := []ProcessingQueueState{

service/history/queue/timer_queue_processor.go

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -157,7 +157,7 @@ func NewTimerQueueProcessor(
157157
status: common.DaemonStatusInitialized,
158158
shutdownChan: make(chan struct{}),
159159

160-
ackLevel: shard.GetTimerAckLevel(),
160+
ackLevel: shard.GetQueueAckLevel(persistence.HistoryTaskCategoryTimer).ScheduledTime,
161161
taskAllocator: taskAllocator,
162162
activeTaskExecutor: activeTaskExecutor,
163163
activeQueueProcessor: activeQueueProcessor,
@@ -275,10 +275,10 @@ func (t *timerQueueProcessor) FailoverDomain(domainIDs map[string]struct{}) {
275275
return
276276
}
277277

278-
minLevel := t.shard.GetTimerClusterAckLevel(t.currentClusterName)
278+
minLevel := t.shard.GetQueueClusterAckLevel(persistence.HistoryTaskCategoryTimer, t.currentClusterName).ScheduledTime
279279
standbyClusterName := t.currentClusterName
280280
for clusterName := range t.shard.GetClusterMetadata().GetEnabledClusterInfo() {
281-
ackLevel := t.shard.GetTimerClusterAckLevel(clusterName)
281+
ackLevel := t.shard.GetQueueClusterAckLevel(persistence.HistoryTaskCategoryTimer, clusterName).ScheduledTime
282282
if ackLevel.Before(minLevel) {
283283
minLevel = ackLevel
284284
standbyClusterName = clusterName
@@ -493,8 +493,8 @@ func (t *timerQueueProcessor) completeTimer(ctx context.Context) error {
493493
}
494494
}
495495

496-
for _, failoverInfo := range t.shard.GetAllTimerFailoverLevels() {
497-
failoverLevel := newTimerTaskKey(failoverInfo.MinLevel, 0)
496+
for _, failoverInfo := range t.shard.GetAllFailoverLevels(persistence.HistoryTaskCategoryTimer) {
497+
failoverLevel := newTimerTaskKey(failoverInfo.MinLevel.ScheduledTime, 0)
498498
newAckLevel = minTaskKey(newAckLevel, failoverLevel)
499499
}
500500

@@ -539,7 +539,9 @@ func (t *timerQueueProcessor) completeTimer(ctx context.Context) error {
539539

540540
t.ackLevel = newAckLevelTimestamp
541541

542-
return t.shard.UpdateTimerAckLevel(t.ackLevel)
542+
return t.shard.UpdateQueueAckLevel(persistence.HistoryTaskCategoryTimer, persistence.HistoryTaskKey{
543+
ScheduledTime: t.ackLevel,
544+
})
543545
}
544546

545547
func loadTimerProcessingQueueStates(
@@ -548,7 +550,7 @@ func loadTimerProcessingQueueStates(
548550
options *queueProcessorOptions,
549551
logger log.Logger,
550552
) []ProcessingQueueState {
551-
ackLevel := shard.GetTimerClusterAckLevel(clusterName)
553+
ackLevel := shard.GetQueueClusterAckLevel(persistence.HistoryTaskCategoryTimer, clusterName).ScheduledTime
552554
if options.EnableLoadQueueStates() {
553555
pStates := shard.GetTimerProcessingQueueStates(clusterName)
554556
if validateProcessingQueueStates(pStates, ackLevel) {

service/history/queue/timer_queue_standby_processor.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,9 @@ func newTimerQueueStandbyProcessor(
8484
}
8585

8686
updateClusterAckLevel := func(ackLevel task.Key) error {
87-
return shard.UpdateTimerClusterAckLevel(clusterName, ackLevel.(timerTaskKey).visibilityTimestamp)
87+
return shard.UpdateQueueClusterAckLevel(persistence.HistoryTaskCategoryTimer, clusterName, persistence.HistoryTaskKey{
88+
ScheduledTime: ackLevel.(timerTaskKey).visibilityTimestamp,
89+
})
8890
}
8991

9092
updateProcessingQueueStates := func(states []ProcessingQueueState) error {

0 commit comments

Comments
 (0)