Skip to content

Commit 4614c23

Browse files
authored
Timer Processor to checkpoint ack level (#233)
* Scheme change to persist timer Ack level. * Shard Context to allocate sequence numbers for timer task ID * Timer Q Processor to maintain ack level. * Update History Engine. * Update ShardContext, Remove timer sequence number. * Checkpoint timestamp up to which timer tasks are processed. * TimerIDs never go below checkpoint ack level. * Logs and fixes. * Remove un-used variables. * go format. * Add unit test for shardContext.TimerAckLevel * rename updateShardInfo -> updateShardInfoLocked * remove shardContext from timerBuilder. * linting. * Add visibility_timestamp to executions schema * Update schema to add visibility_ts * rename visibility_ts * Timer q processor to use timestamp, sequence. * Notify new timer doesn't query until it is current. * linting. * Fix timer mock test. * move Get/Set Visibility Time out of task to server methods. * Move to use mock timer. * linting. * Fix the quick pending timer level. * Adjust batch size.
1 parent 89bae56 commit 4614c23

19 files changed

+784
-448
lines changed

common/persistence/cassandraPersistence.go

Lines changed: 103 additions & 15 deletions
Large diffs are not rendered by default.

common/persistence/cassandraPersistence_test.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
package persistence
2222

2323
import (
24-
"math"
2524
"os"
2625
"testing"
2726
"time"
@@ -641,18 +640,19 @@ func (s *cassandraPersistenceSuite) TestTimerTasks() {
641640
updatedInfo := copyWorkflowExecutionInfo(info0)
642641
updatedInfo.NextEventID = int64(5)
643642
updatedInfo.LastProcessedEvent = int64(2)
644-
tasks := []Task{&DecisionTimeoutTask{1, 2}}
643+
tasks := []Task{&DecisionTimeoutTask{time.Now(), 1, 2}}
645644
err2 := s.UpdateWorkflowExecution(updatedInfo, []int64{int64(4)}, nil, int64(3), tasks, nil, nil, nil, nil, nil)
646645
s.Nil(err2, "No error expected.")
647646

648-
timerTasks, err1 := s.GetTimerIndexTasks(-1, math.MaxInt64)
647+
timerTasks, err1 := s.GetTimerIndexTasks()
649648
s.Nil(err1, "No error expected.")
650649
s.NotNil(timerTasks, "expected valid list of tasks.")
651650

652-
err2 = s.UpdateWorkflowExecution(updatedInfo, nil, nil, int64(5), nil, &DecisionTimeoutTask{TaskID: timerTasks[0].TaskID}, nil, nil, nil, nil)
651+
deleteTimerTask := &DecisionTimeoutTask{VisibilityTimestamp: timerTasks[0].VisibilityTimestamp, TaskID: timerTasks[0].TaskID}
652+
err2 = s.UpdateWorkflowExecution(updatedInfo, nil, nil, int64(5), nil, deleteTimerTask, nil, nil, nil, nil)
653653
s.Nil(err2, "No error expected.")
654654

655-
timerTasks2, err2 := s.GetTimerIndexTasks(-1, math.MaxInt64)
655+
timerTasks2, err2 := s.GetTimerIndexTasks()
656656
s.Nil(err2, "No error expected.")
657657
s.Empty(timerTasks2, "expected empty task list.")
658658
}

common/persistence/dataInterfaces.go

Lines changed: 54 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,7 @@ type (
105105
StolenSinceRenew int
106106
UpdatedAt time.Time
107107
TransferAckLevel int64
108+
TimerAckLevel time.Time
108109
}
109110

110111
// WorkflowExecutionInfo describes a workflow execution
@@ -150,13 +151,14 @@ type (
150151

151152
// TimerTaskInfo describes a timer task.
152153
TimerTaskInfo struct {
153-
DomainID string
154-
WorkflowID string
155-
RunID string
156-
TaskID int64
157-
TaskType int
158-
TimeoutType int
159-
EventID int64
154+
DomainID string
155+
WorkflowID string
156+
RunID string
157+
VisibilityTimestamp time.Time
158+
TaskID int64
159+
TaskType int
160+
TimeoutType int
161+
EventID int64
160162
}
161163

162164
// TaskListInfo describes a state of a task list implementation.
@@ -208,8 +210,9 @@ type (
208210

209211
// DecisionTimeoutTask identifies a timeout task.
210212
DecisionTimeoutTask struct {
211-
TaskID int64
212-
EventID int64
213+
VisibilityTimestamp time.Time
214+
TaskID int64
215+
EventID int64
213216
}
214217

215218
// CancelExecutionTask identifies a transfer task for cancel of execution
@@ -231,15 +234,17 @@ type (
231234

232235
// ActivityTimeoutTask identifies a timeout task.
233236
ActivityTimeoutTask struct {
234-
TaskID int64
235-
TimeoutType int
236-
EventID int64
237+
VisibilityTimestamp time.Time
238+
TaskID int64
239+
TimeoutType int
240+
EventID int64
237241
}
238242

239243
// UserTimerTask identifies a timeout task.
240244
UserTimerTask struct {
241-
TaskID int64
242-
EventID int64
245+
VisibilityTimestamp time.Time
246+
TaskID int64
247+
EventID int64
243248
}
244249

245250
// WorkflowMutableState indicates workflow related state
@@ -400,7 +405,8 @@ type (
400405

401406
// CompleteTimerTaskRequest is used to complete a task in the timer task queue
402407
CompleteTimerTaskRequest struct {
403-
TaskID int64
408+
VisibilityTimestamp time.Time
409+
TaskID int64
404410
}
405411

406412
// LeaseTaskListRequest is used to request lease of a task list
@@ -469,9 +475,9 @@ type (
469475
// GetTimerIndexTasksRequest is the request for GetTimerIndexTasks
470476
// TODO: replace this with an iterator that can configure min and max index.
471477
GetTimerIndexTasksRequest struct {
472-
MinKey int64
473-
MaxKey int64
474-
BatchSize int
478+
MinTimestamp time.Time
479+
MaxTimestamp time.Time
480+
BatchSize int
475481
}
476482

477483
// GetTimerIndexTasksResponse is the response for GetTimerIndexTasks
@@ -719,6 +725,16 @@ func (d *DecisionTimeoutTask) SetTaskID(id int64) {
719725
d.TaskID = id
720726
}
721727

728+
// GetVisibilityTimestamp gets the visibility time stamp
729+
func (d *DecisionTimeoutTask) GetVisibilityTimestamp() time.Time {
730+
return d.VisibilityTimestamp
731+
}
732+
733+
// SetVisibilityTimestamp gets the visibility time stamp
734+
func (d *DecisionTimeoutTask) SetVisibilityTimestamp(t time.Time) {
735+
d.VisibilityTimestamp = t
736+
}
737+
722738
// GetType returns the type of the timer task
723739
func (a *ActivityTimeoutTask) GetType() int {
724740
return TaskTypeActivityTimeout
@@ -734,6 +750,16 @@ func (a *ActivityTimeoutTask) SetTaskID(id int64) {
734750
a.TaskID = id
735751
}
736752

753+
// GetVisibilityTimestamp gets the visibility time stamp
754+
func (a *ActivityTimeoutTask) GetVisibilityTimestamp() time.Time {
755+
return a.VisibilityTimestamp
756+
}
757+
758+
// SetVisibilityTimestamp gets the visibility time stamp
759+
func (a *ActivityTimeoutTask) SetVisibilityTimestamp(t time.Time) {
760+
a.VisibilityTimestamp = t
761+
}
762+
737763
// GetType returns the type of the timer task
738764
func (u *UserTimerTask) GetType() int {
739765
return TaskTypeUserTimer
@@ -749,6 +775,16 @@ func (u *UserTimerTask) SetTaskID(id int64) {
749775
u.TaskID = id
750776
}
751777

778+
// GetVisibilityTimestamp gets the visibility time stamp
779+
func (u *UserTimerTask) GetVisibilityTimestamp() time.Time {
780+
return u.VisibilityTimestamp
781+
}
782+
783+
// SetVisibilityTimestamp gets the visibility time stamp
784+
func (u *UserTimerTask) SetVisibilityTimestamp(t time.Time) {
785+
u.VisibilityTimestamp = t
786+
}
787+
752788
// GetType returns the type of the cancel transfer task
753789
func (u *CancelExecutionTask) GetType() int {
754790
return TransferTaskTypeCancelExecution

common/persistence/persistenceTestBase.go

Lines changed: 76 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
"math"
2525
"math/rand"
2626
"strings"
27+
"sync"
2728
"sync/atomic"
2829
"time"
2930

@@ -65,7 +66,7 @@ type (
6566
MetadataManager MetadataManager
6667
VisibilityMgr VisibilityManager
6768
ShardInfo *ShardInfo
68-
ShardContext *testShardContext
69+
ShardContext *TestShardContext
6970
readLevel int64
7071
CassandraTestCluster
7172
}
@@ -77,10 +78,12 @@ type (
7778
session *gocql.Session
7879
}
7980

80-
testShardContext struct {
81+
// TestShardContext shard context for testing.
82+
// TODO: Cleanup, move this out of persistence
83+
TestShardContext struct {
84+
sync.RWMutex
8185
shardInfo *ShardInfo
8286
transferSequenceNumber int64
83-
timerSequeceNumber int64
8487
historyMgr HistoryManager
8588
executionMgr ExecutionManager
8689
logger bark.Logger
@@ -95,8 +98,8 @@ type (
9598
)
9699

97100
func newTestShardContext(shardInfo *ShardInfo, transferSequenceNumber int64, historyMgr HistoryManager,
98-
executionMgr ExecutionManager, logger bark.Logger) *testShardContext {
99-
return &testShardContext{
101+
executionMgr ExecutionManager, logger bark.Logger) *TestShardContext {
102+
return &TestShardContext{
100103
shardInfo: shardInfo,
101104
transferSequenceNumber: transferSequenceNumber,
102105
historyMgr: historyMgr,
@@ -106,66 +109,102 @@ func newTestShardContext(shardInfo *ShardInfo, transferSequenceNumber int64, his
106109
}
107110
}
108111

109-
func (s *testShardContext) GetExecutionManager() ExecutionManager {
112+
// GetExecutionManager test implementation
113+
func (s *TestShardContext) GetExecutionManager() ExecutionManager {
110114
return s.executionMgr
111115
}
112116

113-
func (s *testShardContext) GetHistoryManager() HistoryManager {
117+
// GetHistoryManager test implementation
118+
func (s *TestShardContext) GetHistoryManager() HistoryManager {
114119
return s.historyMgr
115120
}
116121

117-
func (s *testShardContext) GetNextTransferTaskID() (int64, error) {
122+
// GetNextTransferTaskID test implementation
123+
func (s *TestShardContext) GetNextTransferTaskID() (int64, error) {
118124
return atomic.AddInt64(&s.transferSequenceNumber, 1), nil
119125
}
120126

121-
func (s *testShardContext) GetTransferMaxReadLevel() int64 {
127+
// GetTransferMaxReadLevel test implementation
128+
func (s *TestShardContext) GetTransferMaxReadLevel() int64 {
122129
return atomic.LoadInt64(&s.transferSequenceNumber)
123130
}
124131

125-
func (s *testShardContext) GetTransferAckLevel() int64 {
132+
// GetTransferAckLevel test implementation
133+
func (s *TestShardContext) GetTransferAckLevel() int64 {
126134
return atomic.LoadInt64(&s.shardInfo.TransferAckLevel)
127135
}
128136

129-
func (s *testShardContext) GetTimerSequenceNumber() int64 {
130-
return atomic.AddInt64(&s.timerSequeceNumber, 1)
131-
}
132-
133-
func (s *testShardContext) UpdateAckLevel(ackLevel int64) error {
137+
// UpdateTransferAckLevel test implementation
138+
func (s *TestShardContext) UpdateTransferAckLevel(ackLevel int64) error {
134139
atomic.StoreInt64(&s.shardInfo.TransferAckLevel, ackLevel)
135140
return nil
136141
}
137142

138-
func (s *testShardContext) GetTransferSequenceNumber() int64 {
143+
// GetTransferSequenceNumber test implementation
144+
func (s *TestShardContext) GetTransferSequenceNumber() int64 {
139145
return atomic.LoadInt64(&s.transferSequenceNumber)
140146
}
141147

142-
func (s *testShardContext) CreateWorkflowExecution(request *CreateWorkflowExecutionRequest) (
148+
// GetTimerAckLevel test implementation
149+
func (s *TestShardContext) GetTimerAckLevel() time.Time {
150+
s.RLock()
151+
defer s.RLock()
152+
return s.shardInfo.TimerAckLevel
153+
}
154+
155+
// UpdateTimerAckLevel test implementation
156+
func (s *TestShardContext) UpdateTimerAckLevel(ackLevel time.Time) error {
157+
s.Lock()
158+
defer s.Unlock()
159+
s.shardInfo.TimerAckLevel = ackLevel
160+
return nil
161+
}
162+
163+
// CreateWorkflowExecution test implementation
164+
func (s *TestShardContext) CreateWorkflowExecution(request *CreateWorkflowExecutionRequest) (
143165
*CreateWorkflowExecutionResponse, error) {
144166
return s.executionMgr.CreateWorkflowExecution(request)
145167
}
146168

147-
func (s *testShardContext) UpdateWorkflowExecution(request *UpdateWorkflowExecutionRequest) error {
169+
// UpdateWorkflowExecution test implementation
170+
func (s *TestShardContext) UpdateWorkflowExecution(request *UpdateWorkflowExecutionRequest) error {
171+
// assign IDs for the timer tasks. They need to be assigned under shard lock.
172+
// TODO: This needs to be moved out of persistence.
173+
for _, task := range request.TimerTasks {
174+
seqID, err := s.GetNextTransferTaskID()
175+
if err != nil {
176+
panic(err)
177+
}
178+
task.SetTaskID(seqID)
179+
s.logger.Infof("%v: TestShardContext: Assigning timer (timestamp: %v, seq: %v)",
180+
time.Now().UTC(), GetVisibilityTSFrom(task).UTC(), task.GetTaskID())
181+
}
148182
return s.executionMgr.UpdateWorkflowExecution(request)
149183
}
150184

151-
func (s *testShardContext) AppendHistoryEvents(request *AppendHistoryEventsRequest) error {
185+
// AppendHistoryEvents test implementation
186+
func (s *TestShardContext) AppendHistoryEvents(request *AppendHistoryEventsRequest) error {
152187
return s.historyMgr.AppendHistoryEvents(request)
153188
}
154189

155-
func (s *testShardContext) GetLogger() bark.Logger {
190+
// GetLogger test implementation
191+
func (s *TestShardContext) GetLogger() bark.Logger {
156192
return s.logger
157193
}
158194

159-
func (s *testShardContext) GetMetricsClient() metrics.Client {
195+
// GetMetricsClient test implementation
196+
func (s *TestShardContext) GetMetricsClient() metrics.Client {
160197
return s.metricsClient
161198
}
162199

163-
func (s *testShardContext) Reset() {
200+
// Reset test implementation
201+
func (s *TestShardContext) Reset() {
164202
atomic.StoreInt64(&s.shardInfo.RangeID, 0)
165203
atomic.StoreInt64(&s.shardInfo.TransferAckLevel, 0)
166204
}
167205

168-
func (s *testShardContext) GetRangeID() int64 {
206+
// GetRangeID test implementation
207+
func (s *TestShardContext) GetRangeID() int64 {
169208
return atomic.LoadInt64(&s.shardInfo.RangeID)
170209
}
171210

@@ -598,9 +637,11 @@ func (s *TestBase) CompleteTransferTask(taskID int64) error {
598637
}
599638

600639
// GetTimerIndexTasks is a utility method to get tasks from transfer task queue
601-
func (s *TestBase) GetTimerIndexTasks(minKey int64, maxKey int64) ([]*TimerTaskInfo, error) {
640+
func (s *TestBase) GetTimerIndexTasks() ([]*TimerTaskInfo, error) {
602641
response, err := s.WorkflowMgr.GetTimerIndexTasks(&GetTimerIndexTasksRequest{
603-
MinKey: minKey, MaxKey: maxKey, BatchSize: 10})
642+
MinTimestamp: time.Time{},
643+
MaxTimestamp: time.Unix(0, math.MaxInt64),
644+
BatchSize: 10})
604645

605646
if err != nil {
606647
return nil, err
@@ -609,6 +650,14 @@ func (s *TestBase) GetTimerIndexTasks(minKey int64, maxKey int64) ([]*TimerTaskI
609650
return response.Timers, nil
610651
}
611652

653+
// CompleteTimerTask is a utility method to complete a timer task
654+
func (s *TestBase) CompleteTimerTask(ts time.Time, taskID int64) error {
655+
return s.WorkflowMgr.CompleteTimerTask(&CompleteTimerTaskRequest{
656+
VisibilityTimestamp: ts,
657+
TaskID: taskID,
658+
})
659+
}
660+
612661
// CreateDecisionTask is a utility method to create a task
613662
func (s *TestBase) CreateDecisionTask(domainID string, workflowExecution workflow.WorkflowExecution, taskList string,
614663
decisionScheduleID int64) (int64, error) {
@@ -623,7 +672,7 @@ func (s *TestBase) CreateDecisionTask(domainID string, workflowExecution workflo
623672

624673
taskID := s.GetNextSequenceNumber()
625674
tasks := []*CreateTaskInfo{
626-
&CreateTaskInfo{
675+
{
627676
TaskID: taskID,
628677
Execution: workflowExecution,
629678
Data: &TaskInfo{
@@ -667,7 +716,7 @@ func (s *TestBase) CreateActivityTasks(domainID string, workflowExecution workfl
667716
}
668717
taskID := s.GetNextSequenceNumber()
669718
tasks := []*CreateTaskInfo{
670-
&CreateTaskInfo{
719+
{
671720
TaskID: taskID,
672721
Execution: workflowExecution,
673722
Data: &TaskInfo{

0 commit comments

Comments
 (0)