Skip to content

Commit c8180ad

Browse files
authored
Limit the result of GetTimerTasks query (#195)
Implementation of GetTimerTasks query was sending an unbounded query to server and then iterating over the result to only get bounded items from it. This ended up in generating a large amount of load on the server where it has to return very large result set to the client as the number of timer tasks increased in the system. Updated the query to bound the result to limited number of tasks when issueing query to server, instead of filtering on the client.
1 parent 6f6cdfe commit c8180ad

File tree

1 file changed

+5
-14
lines changed

1 file changed

+5
-14
lines changed

common/persistence/cassandraPersistence.go

Lines changed: 5 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -359,7 +359,7 @@ const (
359359
`and workflow_id = ?` +
360360
`and run_id = ?` +
361361
`and task_id >= ?` +
362-
`and task_id < ?`
362+
`and task_id < ? LIMIT ?`
363363

364364
templateCompleteTimerTaskQuery = `DELETE FROM executions ` +
365365
`WHERE shard_id = ? ` +
@@ -1298,7 +1298,8 @@ func (d *cassandraPersistence) CompleteTask(request *CompleteTaskRequest) error
12981298
return nil
12991299
}
13001300

1301-
func (d *cassandraPersistence) GetTimerIndexTasks(request *GetTimerIndexTasksRequest) (*GetTimerIndexTasksResponse, error) {
1301+
func (d *cassandraPersistence) GetTimerIndexTasks(request *GetTimerIndexTasksRequest) (*GetTimerIndexTasksResponse,
1302+
error) {
13021303
// Reading timer tasks need to be quorum level consistent, otherwise we could loose task
13031304
query := d.session.Query(templateGetTimerTasksQuery,
13041305
d.shardID,
@@ -1307,7 +1308,8 @@ func (d *cassandraPersistence) GetTimerIndexTasks(request *GetTimerIndexTasksReq
13071308
rowTypeTimerWorkflowID,
13081309
rowTypeTimerRunID,
13091310
request.MinKey,
1310-
request.MaxKey)
1311+
request.MaxKey,
1312+
request.BatchSize)
13111313

13121314
iter := query.Iter()
13131315
if iter == nil {
@@ -1318,23 +1320,12 @@ func (d *cassandraPersistence) GetTimerIndexTasks(request *GetTimerIndexTasksReq
13181320

13191321
response := &GetTimerIndexTasksResponse{}
13201322
task := make(map[string]interface{})
1321-
PopulateTasks:
13221323
for iter.MapScan(task) {
13231324
t := createTimerTaskInfo(task["timer"].(map[string]interface{}))
13241325
// Reset task map to get it ready for next scan
13251326
task = make(map[string]interface{})
1326-
// Skip the task if it is not in the bounds.
1327-
if t.TaskID < request.MinKey {
1328-
continue
1329-
}
1330-
if t.TaskID >= request.MaxKey {
1331-
break PopulateTasks
1332-
}
13331327

13341328
response.Timers = append(response.Timers, t)
1335-
if len(response.Timers) == request.BatchSize {
1336-
break PopulateTasks
1337-
}
13381329
}
13391330

13401331
if err := iter.Close(); err != nil {

0 commit comments

Comments
 (0)