Skip to content

Commit 3560320

Browse files
author
Tamer Eldeeb
authored
Restrict clustering key in taskList rangeID check (#292)
The CreateTasks query used to write task list tasks to persistence includes a CAS check that verifies that the rangeID has not changed. The rangeID is a static field in the tasks table, and Cassandra only accepts the partition keys (but not the other clustering keys) in the CAS query to read and update it. This unrestricted query however causes problems when there is a big backlog in the task list as the coordinator automatically turns it into a LIMIT 1 query. To fix this we restrict the query with clustering keys, and rewrite non-static fields so that Cassandra accepts the query restrictions. Issue #277
1 parent 3ac1e5a commit 3560320

File tree

5 files changed

+40
-42
lines changed

5 files changed

+40
-42
lines changed

common/persistence/cassandraPersistence.go

Lines changed: 18 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -71,8 +71,8 @@ const (
7171
)
7272

7373
const (
74-
taskListTaskID = -12345 // for debugging
75-
initialRangeID = 1 // Id of the first range of a new task list
74+
taskListTaskID = -12345
75+
initialRangeID = 1 // Id of the first range of a new task list
7676
)
7777

7878
const (
@@ -477,13 +477,6 @@ const (
477477
`and type = ? ` +
478478
`and task_id = ? ` +
479479
`IF range_id = ?`
480-
481-
templateUpdateTaskListRangeOnlyQuery = `UPDATE tasks SET ` +
482-
`range_id = ? ` +
483-
`WHERE domain_id = ? ` +
484-
`and task_list_name = ? ` +
485-
`and task_list_type = ? ` +
486-
`IF range_id = ?`
487480
)
488481

489482
var (
@@ -1196,7 +1189,7 @@ func (d *cassandraPersistence) LeaseTaskList(request *LeaseTaskListRequest) (*Le
11961189
Msg: fmt.Sprintf("LeaseTaskList failed to apply. db rangeID %v", previousRangeID),
11971190
}
11981191
}
1199-
tli := &TaskListInfo{Name: request.TaskList, TaskType: request.TaskType, RangeID: rangeID + 1, AckLevel: ackLevel}
1192+
tli := &TaskListInfo{DomainID: request.DomainID, Name: request.TaskList, TaskType: request.TaskType, RangeID: rangeID + 1, AckLevel: ackLevel}
12001193
return &LeaseTaskListResponse{TaskListInfo: tli}, nil
12011194
}
12021195

@@ -1244,9 +1237,10 @@ func (d *cassandraPersistence) UpdateTaskList(request *UpdateTaskListRequest) (*
12441237
// From TaskManager interface
12451238
func (d *cassandraPersistence) CreateTasks(request *CreateTasksRequest) (*CreateTasksResponse, error) {
12461239
batch := d.session.NewBatch(gocql.LoggedBatch)
1247-
domainID := request.DomainID
1248-
taskList := request.TaskList
1249-
taskListType := request.TaskListType
1240+
domainID := request.TaskListInfo.DomainID
1241+
taskList := request.TaskListInfo.Name
1242+
taskListType := request.TaskListInfo.TaskType
1243+
ackLevel := request.TaskListInfo.AckLevel
12501244

12511245
for _, task := range request.Tasks {
12521246
scheduleID := task.Data.ScheduleID
@@ -1277,13 +1271,20 @@ func (d *cassandraPersistence) CreateTasks(request *CreateTasksRequest) (*Create
12771271
}
12781272

12791273
// The following query is used to ensure that range_id didn't change
1280-
batch.Query(templateUpdateTaskListRangeOnlyQuery,
1281-
request.RangeID,
1274+
batch.Query(templateUpdateTaskListQuery,
1275+
request.TaskListInfo.RangeID,
12821276
domainID,
12831277
taskList,
12841278
taskListType,
1285-
request.RangeID,
1279+
ackLevel,
1280+
domainID,
1281+
taskList,
1282+
taskListType,
1283+
rowTypeTaskList,
1284+
taskListTaskID,
1285+
request.TaskListInfo.RangeID,
12861286
)
1287+
12871288
previous := make(map[string]interface{})
12881289
applied, _, err := d.session.MapExecuteBatchCAS(batch, previous)
12891290
if err != nil {
@@ -1295,7 +1296,7 @@ func (d *cassandraPersistence) CreateTasks(request *CreateTasksRequest) (*Create
12951296
rangeID := previous["range_id"]
12961297
return nil, &ConditionFailedError{
12971298
Msg: fmt.Sprintf("Failed to create task. TaskList: %v, taskListType: %v, rangeID: %v, db rangeID: %v",
1298-
taskList, taskListType, request.RangeID, rangeID),
1299+
taskList, taskListType, request.TaskListInfo.RangeID, rangeID),
12991300
}
13001301
}
13011302

common/persistence/dataInterfaces.go

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -460,10 +460,7 @@ type (
460460

461461
// CreateTasksRequest is used to create a new task for a workflow exectution
462462
CreateTasksRequest struct {
463-
DomainID string
464-
TaskList string
465-
TaskListType int
466-
RangeID int64
463+
TaskListInfo *TaskListInfo
467464
Tasks []*CreateTaskInfo
468465
}
469466

common/persistence/persistenceTestBase.go

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -723,11 +723,8 @@ func (s *TestBase) CreateDecisionTask(domainID string, workflowExecution workflo
723723
}
724724

725725
_, err = s.TaskMgr.CreateTasks(&CreateTasksRequest{
726-
DomainID: domainID,
727-
TaskList: taskList,
728-
TaskListType: TaskListTypeDecision,
726+
TaskListInfo: leaseResponse.TaskListInfo,
729727
Tasks: tasks,
730-
RangeID: leaseResponse.TaskListInfo.RangeID,
731728
})
732729

733730
if err != nil {
@@ -766,11 +763,8 @@ func (s *TestBase) CreateActivityTasks(domainID string, workflowExecution workfl
766763
},
767764
}
768765
_, err := s.TaskMgr.CreateTasks(&CreateTasksRequest{
769-
DomainID: domainID,
770-
TaskList: taskList,
771-
TaskListType: TaskListTypeActivity,
766+
TaskListInfo: leaseResponse.TaskListInfo,
772767
Tasks: tasks,
773-
RangeID: leaseResponse.TaskListInfo.RangeID,
774768
})
775769

776770
if err != nil {

service/matching/matchingEngine_test.go

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1281,28 +1281,29 @@ func (m *testTaskManager) CompleteTask(request *persistence.CompleteTaskRequest)
12811281

12821282
// CreateTask provides a mock function with given fields: request
12831283
func (m *testTaskManager) CreateTasks(request *persistence.CreateTasksRequest) (*persistence.CreateTasksResponse, error) {
1284-
domainID := request.DomainID
1285-
taskList := request.TaskList
1286-
taskType := request.TaskListType
1284+
domainID := request.TaskListInfo.DomainID
1285+
taskList := request.TaskListInfo.Name
1286+
taskType := request.TaskListInfo.TaskType
1287+
rangeID := request.TaskListInfo.RangeID
12871288

12881289
tlm := m.getTaskListManager(newTaskListID(domainID, taskList, taskType))
12891290
tlm.Lock()
12901291
defer tlm.Unlock()
12911292

12921293
// First validate the entire batch
12931294
for _, task := range request.Tasks {
1294-
m.logger.Debugf("testTaskManager.CreateTask taskID=%v, rangeID=%v", task.TaskID, request.RangeID)
1295+
m.logger.Debugf("testTaskManager.CreateTask taskID=%v, rangeID=%v", task.TaskID, rangeID)
12951296
if task.TaskID <= 0 {
12961297
panic(fmt.Errorf("Invalid taskID=%v", task.TaskID))
12971298
}
12981299

1299-
if tlm.rangeID != request.RangeID {
1300+
if tlm.rangeID != rangeID {
13001301
m.logger.Debugf("testTaskManager.CreateTask ConditionFailedError taskID=%v, rangeID: %v, db rangeID: %v",
1301-
task.TaskID, request.RangeID, tlm.rangeID)
1302+
task.TaskID, rangeID, tlm.rangeID)
13021303

13031304
return nil, &persistence.ConditionFailedError{
13041305
Msg: fmt.Sprintf("testTaskManager.CreateTask failed. TaskList: %v, taskType: %v, rangeID: %v, db rangeID: %v",
1305-
taskList, taskType, request.RangeID, tlm.rangeID),
1306+
taskList, taskType, rangeID, tlm.rangeID),
13061307
}
13071308
}
13081309
_, ok := tlm.tasks.Get(task.TaskID)

service/matching/taskWriter.go

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -159,15 +159,20 @@ writerLoop:
159159
maxReadLevel = taskIDs[i]
160160
}
161161

162+
tlInfo := &persistence.TaskListInfo{
163+
DomainID: w.taskListID.domainID,
164+
Name: w.taskListID.taskListName,
165+
TaskType: w.taskListID.taskType,
166+
// Note that newTaskID could increment range, so rangeID parameter
167+
// might be out of sync. This is OK as caller can just retry.
168+
RangeID: rangeID,
169+
AckLevel: w.tlMgr.getAckLevel(),
170+
}
171+
162172
w.tlMgr.persistenceLock.Lock()
163173
r, err := w.taskManager.CreateTasks(&persistence.CreateTasksRequest{
164-
DomainID: w.taskListID.domainID,
165-
TaskList: w.taskListID.taskListName,
166-
TaskListType: w.taskListID.taskType,
174+
TaskListInfo: tlInfo,
167175
Tasks: tasks,
168-
// Note that newTaskID could increment range, so rangeID parameter
169-
// might be out of sync. This is OK as caller can just retry.
170-
RangeID: rangeID,
171176
})
172177
w.tlMgr.persistenceLock.Unlock()
173178

0 commit comments

Comments
 (0)