Skip to content

Commit df9ac3e

Browse files
author
Tamer Eldeeb
authored
Properly handle throttling errors from Cassandra (#316)
Handle overload errors returned from Cassandra differently from timeouts and internal service errors. This is because in the case of throttling errors we know that the query never executed, so we do not have to invoke the cache consistency maintenance part. Issue #300
1 parent 215baf5 commit df9ac3e

File tree

8 files changed

+157
-22
lines changed

8 files changed

+157
-22
lines changed

common/metrics/defs.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -447,6 +447,7 @@ const (
447447
PersistenceErrShardOwnershipLostCounter
448448
PersistenceErrConditionFailedCounter
449449
PersistenceErrTimeoutCounter
450+
PersistenceErrBusyCounter
450451

451452
NumCommonMetrics
452453
)
@@ -523,6 +524,7 @@ var MetricDefs = map[ServiceIdx]map[int]metricDefinition{
523524
PersistenceErrShardOwnershipLostCounter: {metricName: "persistence.errors.shard-ownership-lost", metricType: Counter},
524525
PersistenceErrConditionFailedCounter: {metricName: "persistence.errors.condition-failed", metricType: Counter},
525526
PersistenceErrTimeoutCounter: {metricName: "persistence.errors.timeout", metricType: Counter},
527+
PersistenceErrBusyCounter: {metricName: "persistence.errors.busy", metricType: Counter},
526528
},
527529
Frontend: {},
528530
History: {

common/persistence/cassandraHistoryPersistence.go

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,11 @@ func (h *cassandraHistoryPersistence) AppendHistoryEvents(request *AppendHistory
115115
previous := make(map[string]interface{})
116116
applied, err := query.MapScanCAS(previous)
117117
if err != nil {
118-
if _, ok := err.(*gocql.RequestErrWriteTimeout); ok {
118+
if isThrottlingError(err) {
119+
return &workflow.ServiceBusyError{
120+
Message: fmt.Sprintf("AppendHistoryEvents operation failed. Error: %v", err),
121+
}
122+
} else if isTimeoutError(err) {
119123
// Write may have succeeded, but we don't know
120124
// return this info to the caller so they have the option of trying to find out by executing a read
121125
return &TimeoutError{Msg: fmt.Sprintf("AppendHistoryEvents timed out. Error: %v", err)}
@@ -189,6 +193,11 @@ func (h *cassandraHistoryPersistence) DeleteWorkflowExecutionHistory(
189193

190194
err := query.Exec()
191195
if err != nil {
196+
if isThrottlingError(err) {
197+
return &workflow.ServiceBusyError{
198+
Message: fmt.Sprintf("DeleteWorkflowExecutionHistory operation failed. Error: %v", err),
199+
}
200+
}
192201
return &workflow.InternalServiceError{
193202
Message: fmt.Sprintf("DeleteWorkflowExecutionHistory operation failed. Error: %v", err),
194203
}

common/persistence/cassandraPersistence.go

Lines changed: 85 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -595,6 +595,11 @@ func (d *cassandraPersistence) CreateShard(request *CreateShardRequest) error {
595595
previous := make(map[string]interface{})
596596
applied, err := query.MapScanCAS(previous)
597597
if err != nil {
598+
if isThrottlingError(err) {
599+
return &workflow.ServiceBusyError{
600+
Message: fmt.Sprintf("CreateShard operation failed. Error: %v", err),
601+
}
602+
}
598603
return &workflow.InternalServiceError{
599604
Message: fmt.Sprintf("CreateShard operation failed. Error : %v", err),
600605
}
@@ -628,6 +633,10 @@ func (d *cassandraPersistence) GetShard(request *GetShardRequest) (*GetShardResp
628633
return nil, &workflow.EntityNotExistsError{
629634
Message: fmt.Sprintf("Shard not found. ShardId: %v", shardID),
630635
}
636+
} else if isThrottlingError(err) {
637+
return nil, &workflow.ServiceBusyError{
638+
Message: fmt.Sprintf("GetShard operation failed. Error: %v", err),
639+
}
631640
}
632641

633642
return nil, &workflow.InternalServiceError{
@@ -665,6 +674,11 @@ func (d *cassandraPersistence) UpdateShard(request *UpdateShardRequest) error {
665674
previous := make(map[string]interface{})
666675
applied, err := query.MapScanCAS(previous)
667676
if err != nil {
677+
if isThrottlingError(err) {
678+
return &workflow.ServiceBusyError{
679+
Message: fmt.Sprintf("UpdateShard operation failed. Error: %v", err),
680+
}
681+
}
668682
return &workflow.InternalServiceError{
669683
Message: fmt.Sprintf("UpdateShard operation failed. Error: %v", err),
670684
}
@@ -724,6 +738,10 @@ func (d *cassandraPersistence) CreateWorkflowExecution(request *CreateWorkflowEx
724738
// Write may have succeeded, but we don't know
725739
// return this info to the caller so they have the option of trying to find out by executing a read
726740
return nil, &TimeoutError{Msg: fmt.Sprintf("CreateWorkflowExecution timed out. Error: %v", err)}
741+
} else if isThrottlingError(err) {
742+
return nil, &workflow.ServiceBusyError{
743+
Message: fmt.Sprintf("CreateWorkflowExecution operation failed. Error: %v", err),
744+
}
727745
}
728746
return nil, &workflow.InternalServiceError{
729747
Message: fmt.Sprintf("CreateWorkflowExecution operation failed. Error: %v", err),
@@ -887,6 +905,10 @@ func (d *cassandraPersistence) GetWorkflowExecution(request *GetWorkflowExecutio
887905
Message: fmt.Sprintf("Workflow execution not found. WorkflowId: %v, RunId: %v",
888906
execution.GetWorkflowId(), execution.GetRunId()),
889907
}
908+
} else if isThrottlingError(err) {
909+
return nil, &workflow.ServiceBusyError{
910+
Message: fmt.Sprintf("GetWorkflowExecution operation failed. Error: %v", err),
911+
}
890912
}
891913

892914
return nil, &workflow.InternalServiceError{
@@ -1035,6 +1057,10 @@ func (d *cassandraPersistence) UpdateWorkflowExecution(request *UpdateWorkflowEx
10351057
// Write may have succeeded, but we don't know
10361058
// return this info to the caller so they have the option of trying to find out by executing a read
10371059
return &TimeoutError{Msg: fmt.Sprintf("UpdateWorkflowExecution timed out. Error: %v", err)}
1060+
} else if isThrottlingError(err) {
1061+
return &workflow.ServiceBusyError{
1062+
Message: fmt.Sprintf("UpdateWorkflowExecution operation failed. Error: %v", err),
1063+
}
10381064
}
10391065
return &workflow.InternalServiceError{
10401066
Message: fmt.Sprintf("UpdateWorkflowExecution operation failed. Error: %v", err),
@@ -1109,6 +1135,11 @@ func (d *cassandraPersistence) DeleteWorkflowExecution(request *DeleteWorkflowEx
11091135

11101136
err := query.Exec()
11111137
if err != nil {
1138+
if isThrottlingError(err) {
1139+
return &workflow.ServiceBusyError{
1140+
Message: fmt.Sprintf("DeleteWorkflowExecution operation failed. Error: %v", err),
1141+
}
1142+
}
11121143
return &workflow.InternalServiceError{
11131144
Message: fmt.Sprintf("DeleteWorkflowExecution operation failed. Error: %v", err),
11141145
}
@@ -1135,6 +1166,10 @@ func (d *cassandraPersistence) GetCurrentExecution(request *GetCurrentExecutionR
11351166
Message: fmt.Sprintf("Workflow execution not found. WorkflowId: %v",
11361167
request.WorkflowID),
11371168
}
1169+
} else if isThrottlingError(err) {
1170+
return nil, &workflow.ServiceBusyError{
1171+
Message: fmt.Sprintf("GetCurrentExecution operation failed. Error: %v", err),
1172+
}
11381173
}
11391174

11401175
return nil, &workflow.InternalServiceError{
@@ -1197,6 +1232,11 @@ func (d *cassandraPersistence) CompleteTransferTask(request *CompleteTransferTas
11971232

11981233
err := query.Exec()
11991234
if err != nil {
1235+
if isThrottlingError(err) {
1236+
return &workflow.ServiceBusyError{
1237+
Message: fmt.Sprintf("CompleteTransferTask operation failed. Error: %v", err),
1238+
}
1239+
}
12001240
return &workflow.InternalServiceError{
12011241
Message: fmt.Sprintf("CompleteTransferTask operation failed. Error: %v", err),
12021242
}
@@ -1218,6 +1258,11 @@ func (d *cassandraPersistence) CompleteTimerTask(request *CompleteTimerTaskReque
12181258

12191259
err := query.Exec()
12201260
if err != nil {
1261+
if isThrottlingError(err) {
1262+
return &workflow.ServiceBusyError{
1263+
Message: fmt.Sprintf("CompleteTimerTask operation failed. Error: %v", err),
1264+
}
1265+
}
12211266
return &workflow.InternalServiceError{
12221267
Message: fmt.Sprintf("CompleteTimerTask operation failed. Error: %v", err),
12231268
}
@@ -1256,9 +1301,14 @@ func (d *cassandraPersistence) LeaseTaskList(request *LeaseTaskListRequest) (*Le
12561301
request.TaskList,
12571302
request.TaskType,
12581303
0)
1304+
} else if isThrottlingError(err) {
1305+
return nil, &workflow.ServiceBusyError{
1306+
Message: fmt.Sprintf("LeaseTaskList operation failed. TaskList: %v, TaskType: %v, Error: %v",
1307+
request.TaskList, request.TaskType, err),
1308+
}
12591309
} else {
12601310
return nil, &workflow.InternalServiceError{
1261-
Message: fmt.Sprintf("LeaseTaskList operation failed. TaskList: %v, TaskType: %v, Error : %v",
1311+
Message: fmt.Sprintf("LeaseTaskList operation failed. TaskList: %v, TaskType: %v, Error: %v",
12621312
request.TaskList, request.TaskType, err),
12631313
}
12641314
}
@@ -1281,6 +1331,11 @@ func (d *cassandraPersistence) LeaseTaskList(request *LeaseTaskListRequest) (*Le
12811331
previous := make(map[string]interface{})
12821332
applied, err := query.MapScanCAS(previous)
12831333
if err != nil {
1334+
if isThrottlingError(err) {
1335+
return nil, &workflow.ServiceBusyError{
1336+
Message: fmt.Sprintf("LeaseTaskList operation failed. Error: %v", err),
1337+
}
1338+
}
12841339
return nil, &workflow.InternalServiceError{
12851340
Message: fmt.Sprintf("LeaseTaskList operation failed. Error : %v", err),
12861341
}
@@ -1316,6 +1371,11 @@ func (d *cassandraPersistence) UpdateTaskList(request *UpdateTaskListRequest) (*
13161371
previous := make(map[string]interface{})
13171372
applied, err := query.MapScanCAS(previous)
13181373
if err != nil {
1374+
if isThrottlingError(err) {
1375+
return nil, &workflow.ServiceBusyError{
1376+
Message: fmt.Sprintf("UpdateTaskList operation failed. Error: %v", err),
1377+
}
1378+
}
13191379
return nil, &workflow.InternalServiceError{
13201380
Message: fmt.Sprintf("UpdateTaskList operation failed. Error: %v", err),
13211381
}
@@ -1390,8 +1450,13 @@ func (d *cassandraPersistence) CreateTasks(request *CreateTasksRequest) (*Create
13901450
previous := make(map[string]interface{})
13911451
applied, _, err := d.session.MapExecuteBatchCAS(batch, previous)
13921452
if err != nil {
1453+
if isThrottlingError(err) {
1454+
return nil, &workflow.ServiceBusyError{
1455+
Message: fmt.Sprintf("CreateTasks operation failed. Error: %v", err),
1456+
}
1457+
}
13931458
return nil, &workflow.InternalServiceError{
1394-
Message: fmt.Sprintf("CreateTask operation failed. Error : %v", err),
1459+
Message: fmt.Sprintf("CreateTasks operation failed. Error : %v", err),
13951460
}
13961461
}
13971462
if !applied {
@@ -1466,6 +1531,11 @@ func (d *cassandraPersistence) CompleteTask(request *CompleteTaskRequest) error
14661531

14671532
err := query.Exec()
14681533
if err != nil {
1534+
if isThrottlingError(err) {
1535+
return &workflow.ServiceBusyError{
1536+
Message: fmt.Sprintf("CompleteTask operation failed. Error: %v", err),
1537+
}
1538+
}
14691539
return &workflow.InternalServiceError{
14701540
Message: fmt.Sprintf("CompleteTask operation failed. Error: %v", err),
14711541
}
@@ -1507,6 +1577,11 @@ func (d *cassandraPersistence) GetTimerIndexTasks(request *GetTimerIndexTasksReq
15071577
}
15081578

15091579
if err := iter.Close(); err != nil {
1580+
if isThrottlingError(err) {
1581+
return nil, &workflow.ServiceBusyError{
1582+
Message: fmt.Sprintf("GetTimerTasks operation failed. Error: %v", err),
1583+
}
1584+
}
15101585
return nil, &workflow.InternalServiceError{
15111586
Message: fmt.Sprintf("GetTimerTasks operation failed. Error: %v", err),
15121587
}
@@ -2035,6 +2110,14 @@ func isTimeoutError(err error) bool {
20352110
return ok
20362111
}
20372112

2113+
func isThrottlingError(err error) bool {
2114+
if req, ok := err.(gocql.RequestError); ok {
2115+
// gocql does not expose the constant errOverloaded = 0x1001
2116+
return req.Code() == 0x1001
2117+
}
2118+
return false
2119+
}
2120+
20382121
// GetVisibilityTSFrom - helper method to get visibility timestamp
20392122
func GetVisibilityTSFrom(task Task) time.Time {
20402123
switch task.GetType() {

common/persistence/cassandraVisibilityPersistence.go

Lines changed: 52 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,11 @@ func (v *cassandraVisibilityPersistence) RecordWorkflowExecutionStarted(
160160
query = query.WithTimestamp(common.UnixNanoToCQLTimestamp(request.StartTimestamp))
161161
err := query.Exec()
162162
if err != nil {
163+
if isThrottlingError(err) {
164+
return &workflow.ServiceBusyError{
165+
Message: fmt.Sprintf("RecordWorkflowExecutionStarted operation failed. Error: %v", err),
166+
}
167+
}
163168
return &workflow.InternalServiceError{
164169
Message: fmt.Sprintf("RecordWorkflowExecutionStarted operation failed. Error: %v", err),
165170
}
@@ -204,6 +209,11 @@ func (v *cassandraVisibilityPersistence) RecordWorkflowExecutionClosed(
204209
batch = batch.WithTimestamp(common.UnixNanoToCQLTimestamp(request.CloseTimestamp))
205210
err := v.session.ExecuteBatch(batch)
206211
if err != nil {
212+
if isThrottlingError(err) {
213+
return &workflow.ServiceBusyError{
214+
Message: fmt.Sprintf("RecordWorkflowExecutionClosed operation failed. Error: %v", err),
215+
}
216+
}
207217
return &workflow.InternalServiceError{
208218
Message: fmt.Sprintf("RecordWorkflowExecutionClosed operation failed. Error: %v", err),
209219
}
@@ -238,6 +248,11 @@ func (v *cassandraVisibilityPersistence) ListOpenWorkflowExecutions(
238248
response.NextPageToken = make([]byte, len(nextPageToken))
239249
copy(response.NextPageToken, nextPageToken)
240250
if err := iter.Close(); err != nil {
251+
if isThrottlingError(err) {
252+
return nil, &workflow.ServiceBusyError{
253+
Message: fmt.Sprintf("ListOpenWorkflowExecutions operation failed. Error: %v", err),
254+
}
255+
}
241256
return nil, &workflow.InternalServiceError{
242257
Message: fmt.Sprintf("ListOpenWorkflowExecutions operation failed. Error: %v", err),
243258
}
@@ -257,7 +272,7 @@ func (v *cassandraVisibilityPersistence) ListClosedWorkflowExecutions(
257272
if iter == nil {
258273
// TODO: should return a bad request error if the token is invalid
259274
return nil, &workflow.InternalServiceError{
260-
Message: "ListOpenWorkflowExecutions operation failed. Not able to create query iterator.",
275+
Message: "ListClosedWorkflowExecutions operation failed. Not able to create query iterator.",
261276
}
262277
}
263278

@@ -273,8 +288,13 @@ func (v *cassandraVisibilityPersistence) ListClosedWorkflowExecutions(
273288
response.NextPageToken = make([]byte, len(nextPageToken))
274289
copy(response.NextPageToken, nextPageToken)
275290
if err := iter.Close(); err != nil {
291+
if isThrottlingError(err) {
292+
return nil, &workflow.ServiceBusyError{
293+
Message: fmt.Sprintf("ListClosedWorkflowExecutions operation failed. Error: %v", err),
294+
}
295+
}
276296
return nil, &workflow.InternalServiceError{
277-
Message: fmt.Sprintf("ListOpenWorkflowExecutions operation failed. Error: %v", err),
297+
Message: fmt.Sprintf("ListClosedWorkflowExecutions operation failed. Error: %v", err),
278298
}
279299
}
280300

@@ -309,6 +329,11 @@ func (v *cassandraVisibilityPersistence) ListOpenWorkflowExecutionsByType(
309329
response.NextPageToken = make([]byte, len(nextPageToken))
310330
copy(response.NextPageToken, nextPageToken)
311331
if err := iter.Close(); err != nil {
332+
if isThrottlingError(err) {
333+
return nil, &workflow.ServiceBusyError{
334+
Message: fmt.Sprintf("ListOpenWorkflowExecutionsByType operation failed. Error: %v", err),
335+
}
336+
}
312337
return nil, &workflow.InternalServiceError{
313338
Message: fmt.Sprintf("ListOpenWorkflowExecutionsByType operation failed. Error: %v", err),
314339
}
@@ -345,6 +370,11 @@ func (v *cassandraVisibilityPersistence) ListClosedWorkflowExecutionsByType(
345370
response.NextPageToken = make([]byte, len(nextPageToken))
346371
copy(response.NextPageToken, nextPageToken)
347372
if err := iter.Close(); err != nil {
373+
if isThrottlingError(err) {
374+
return nil, &workflow.ServiceBusyError{
375+
Message: fmt.Sprintf("ListClosedWorkflowExecutionsByType operation failed. Error: %v", err),
376+
}
377+
}
348378
return nil, &workflow.InternalServiceError{
349379
Message: fmt.Sprintf("ListClosedWorkflowExecutionsByType operation failed. Error: %v", err),
350380
}
@@ -381,6 +411,11 @@ func (v *cassandraVisibilityPersistence) ListOpenWorkflowExecutionsByWorkflowID(
381411
response.NextPageToken = make([]byte, len(nextPageToken))
382412
copy(response.NextPageToken, nextPageToken)
383413
if err := iter.Close(); err != nil {
414+
if isThrottlingError(err) {
415+
return nil, &workflow.ServiceBusyError{
416+
Message: fmt.Sprintf("ListOpenWorkflowExecutionsByWorkflowID operation failed. Error: %v", err),
417+
}
418+
}
384419
return nil, &workflow.InternalServiceError{
385420
Message: fmt.Sprintf("ListOpenWorkflowExecutionsByWorkflowID operation failed. Error: %v", err),
386421
}
@@ -417,6 +452,11 @@ func (v *cassandraVisibilityPersistence) ListClosedWorkflowExecutionsByWorkflowI
417452
response.NextPageToken = make([]byte, len(nextPageToken))
418453
copy(response.NextPageToken, nextPageToken)
419454
if err := iter.Close(); err != nil {
455+
if isThrottlingError(err) {
456+
return nil, &workflow.ServiceBusyError{
457+
Message: fmt.Sprintf("ListClosedWorkflowExecutionsByWorkflowID operation failed. Error: %v", err),
458+
}
459+
}
420460
return nil, &workflow.InternalServiceError{
421461
Message: fmt.Sprintf("ListClosedWorkflowExecutionsByWorkflowID operation failed. Error: %v", err),
422462
}
@@ -453,6 +493,11 @@ func (v *cassandraVisibilityPersistence) ListClosedWorkflowExecutionsByStatus(
453493
response.NextPageToken = make([]byte, len(nextPageToken))
454494
copy(response.NextPageToken, nextPageToken)
455495
if err := iter.Close(); err != nil {
496+
if isThrottlingError(err) {
497+
return nil, &workflow.ServiceBusyError{
498+
Message: fmt.Sprintf("ListClosedWorkflowExecutionsByStatus operation failed. Error: %v", err),
499+
}
500+
}
456501
return nil, &workflow.InternalServiceError{
457502
Message: fmt.Sprintf("ListClosedWorkflowExecutionsByStatus operation failed. Error: %v", err),
458503
}
@@ -486,6 +531,11 @@ func (v *cassandraVisibilityPersistence) GetClosedWorkflowExecution(
486531
}
487532

488533
if err := iter.Close(); err != nil {
534+
if isThrottlingError(err) {
535+
return nil, &workflow.ServiceBusyError{
536+
Message: fmt.Sprintf("GetClosedWorkflowExecution operation failed. Error: %v", err),
537+
}
538+
}
489539
return nil, &workflow.InternalServiceError{
490540
Message: fmt.Sprintf("GetClosedWorkflowExecution operation failed. Error: %v", err),
491541
}

common/persistence/persistenceMetricClients.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -295,6 +295,9 @@ func (p *workflowExecutionPersistenceClient) updateErrorMetric(scope int, err er
295295
case *TimeoutError:
296296
p.metricClient.IncCounter(scope, metrics.PersistenceErrTimeoutCounter)
297297
p.metricClient.IncCounter(scope, metrics.PersistenceFailures)
298+
case *workflow.ServiceBusyError:
299+
p.metricClient.IncCounter(scope, metrics.PersistenceErrBusyCounter)
300+
p.metricClient.IncCounter(scope, metrics.PersistenceFailures)
298301
default:
299302
p.metricClient.IncCounter(scope, metrics.PersistenceFailures)
300303
}

0 commit comments

Comments
 (0)