@@ -246,6 +246,12 @@ const (
246
246
templateUpdateLeaseQuery = `UPDATE executions ` +
247
247
`SET range_id = ? ` +
248
248
`WHERE shard_id = ? ` +
249
+ `and type = ? ` +
250
+ `and domain_id = ? ` +
251
+ `and workflow_id = ? ` +
252
+ `and run_id = ? ` +
253
+ `and visibility_ts = ? ` +
254
+ `and task_id = ? ` +
249
255
`IF range_id = ?`
250
256
251
257
templateGetWorkflowExecutionQuery = `SELECT execution, activity_map, timer_map, child_executions_map, request_cancel_map ` +
@@ -277,7 +283,7 @@ const (
277
283
`and run_id = ? ` +
278
284
`and visibility_ts = ? ` +
279
285
`and task_id = ? ` +
280
- `IF next_event_id = ? and range_id = ? `
286
+ `IF next_event_id = ?`
281
287
282
288
templateUpdateActivityInfoQuery = `UPDATE executions ` +
283
289
`SET activity_map[ ? ] =` + templateActivityInfoType + ` ` +
@@ -288,7 +294,7 @@ const (
288
294
`and run_id = ? ` +
289
295
`and visibility_ts = ? ` +
290
296
`and task_id = ? ` +
291
- `IF next_event_id = ? and range_id = ? `
297
+ `IF next_event_id = ?`
292
298
293
299
templateUpdateTimerInfoQuery = `UPDATE executions ` +
294
300
`SET timer_map[ ? ] =` + templateTimerInfoType + ` ` +
@@ -299,7 +305,7 @@ const (
299
305
`and run_id = ? ` +
300
306
`and visibility_ts = ? ` +
301
307
`and task_id = ? ` +
302
- `IF next_event_id = ? and range_id = ? `
308
+ `IF next_event_id = ?`
303
309
304
310
templateUpdateChildExecutionInfoQuery = `UPDATE executions ` +
305
311
`SET child_executions_map[ ? ] =` + templateChildExecutionInfoType + ` ` +
@@ -310,7 +316,7 @@ const (
310
316
`and run_id = ? ` +
311
317
`and visibility_ts = ? ` +
312
318
`and task_id = ? ` +
313
- `IF next_event_id = ? and range_id = ? `
319
+ `IF next_event_id = ?`
314
320
315
321
templateUpdateRequestCancelInfoQuery = `UPDATE executions ` +
316
322
`SET request_cancel_map[ ? ] =` + templateRequestCancelInfoType + ` ` +
@@ -321,7 +327,7 @@ const (
321
327
`and run_id = ? ` +
322
328
`and visibility_ts = ? ` +
323
329
`and task_id = ? ` +
324
- `IF next_event_id = ? and range_id = ? `
330
+ `IF next_event_id = ?`
325
331
326
332
templateDeleteActivityInfoQuery = `DELETE activity_map[ ? ] ` +
327
333
`FROM executions ` +
@@ -332,7 +338,7 @@ const (
332
338
`and run_id = ? ` +
333
339
`and visibility_ts = ? ` +
334
340
`and task_id = ? ` +
335
- `IF next_event_id = ? and range_id = ? `
341
+ `IF next_event_id = ?`
336
342
337
343
templateDeleteTimerInfoQuery = `DELETE timer_map[ ? ] ` +
338
344
`FROM executions ` +
@@ -343,7 +349,7 @@ const (
343
349
`and run_id = ? ` +
344
350
`and visibility_ts = ? ` +
345
351
`and task_id = ? ` +
346
- `IF next_event_id = ? and range_id = ? `
352
+ `IF next_event_id = ?`
347
353
348
354
templateDeleteChildExecutionInfoQuery = `DELETE child_executions_map[ ? ] ` +
349
355
`FROM executions ` +
@@ -354,7 +360,7 @@ const (
354
360
`and run_id = ? ` +
355
361
`and visibility_ts = ? ` +
356
362
`and task_id = ? ` +
357
- `IF next_event_id = ? and range_id = ? `
363
+ `IF next_event_id = ?`
358
364
359
365
templateDeleteRequestCancelInfoQuery = `DELETE request_cancel_map[ ? ] ` +
360
366
`FROM executions ` +
@@ -365,7 +371,7 @@ const (
365
371
`and run_id = ? ` +
366
372
`and visibility_ts = ? ` +
367
373
`and task_id = ? ` +
368
- `IF next_event_id = ? and range_id = ? `
374
+ `IF next_event_id = ?`
369
375
370
376
templateDeleteWorkflowExecutionQuery = `DELETE FROM executions ` +
371
377
`WHERE shard_id = ? ` +
@@ -680,11 +686,18 @@ func (d *cassandraPersistence) CreateWorkflowExecution(request *CreateWorkflowEx
680
686
batch .Query (templateUpdateLeaseQuery ,
681
687
request .RangeID ,
682
688
d .shardID ,
689
+ rowTypeShard ,
690
+ rowTypeShardDomainID ,
691
+ rowTypeShardWorkflowID ,
692
+ rowTypeShardRunID ,
693
+ defaultVisibilityTimestamp ,
694
+ rowTypeShardTaskID ,
683
695
request .RangeID ,
684
696
)
685
697
686
698
previous := make (map [string ]interface {})
687
- applied , _ , err := d .session .MapExecuteBatchCAS (batch , previous )
699
+ applied , iter , err := d .session .MapExecuteBatchCAS (batch , previous )
700
+ defer iter .Close ()
688
701
if err != nil {
689
702
if isTimeoutError (err ) {
690
703
// Write may have succeeded, but we don't know
@@ -694,34 +707,62 @@ func (d *cassandraPersistence) CreateWorkflowExecution(request *CreateWorkflowEx
694
707
return nil , & workflow.InternalServiceError {
695
708
Message : fmt .Sprintf ("CreateWorkflowExecution operation failed. Error: %v" , err ),
696
709
}
710
+
697
711
}
698
712
699
713
if ! applied {
700
- if rangeID , ok := previous ["range_id" ].(int64 ); ok && rangeID != request .RangeID {
701
- // CreateWorkflowExecution failed because rangeID was modified
702
- return nil , & ShardOwnershipLostError {
703
- ShardID : d .shardID ,
704
- Msg : fmt .Sprintf ("Failed to create workflow execution. Request RangeID: %v, Actual RangeID: %v" ,
705
- request .RangeID , rangeID ),
714
+ // There can be two reasons why the query does not get applied. Either the RangeID has changed, or
715
+ // the workflow is already started. Check the row info returned by Cassandra to figure out which one it is.
716
+ GetFailureReasonLoop:
717
+ for {
718
+ rowType , ok := previous ["type" ].(int )
719
+ if ! ok {
720
+ // This should never happen, as all our rows have the type field.
721
+ break GetFailureReasonLoop
722
+ }
723
+
724
+ if rowType == rowTypeShard {
725
+ if rangeID , ok := previous ["range_id" ].(int64 ); ok && rangeID != request .RangeID {
726
+ // CreateWorkflowExecution failed because rangeID was modified
727
+ return nil , & ShardOwnershipLostError {
728
+ ShardID : d .shardID ,
729
+ Msg : fmt .Sprintf ("Failed to create workflow execution. Request RangeID: %v, Actual RangeID: %v" ,
730
+ request .RangeID , rangeID ),
731
+ }
732
+ }
733
+
734
+ } else {
735
+ var columns []string
736
+ for k , v := range previous {
737
+ columns = append (columns , fmt .Sprintf ("%s=%v" , k , v ))
738
+ }
739
+
740
+ if execution , ok := previous ["execution" ].(map [string ]interface {}); ok {
741
+ // CreateWorkflowExecution failed because it already exists
742
+ msg := fmt .Sprintf ("Workflow execution already running. WorkflowId: %v, RunId: %v, rangeID: %v, columns: (%v)" ,
743
+ execution ["workflow_id" ], execution ["run_id" ], request .RangeID , strings .Join (columns , "," ))
744
+ return nil , & workflow.WorkflowExecutionAlreadyStartedError {
745
+ Message : common .StringPtr (msg ),
746
+ StartRequestId : common .StringPtr (fmt .Sprintf ("%v" , execution ["create_request_id" ])),
747
+ RunId : common .StringPtr (fmt .Sprintf ("%v" , execution ["run_id" ])),
748
+ }
749
+ }
750
+ }
751
+
752
+ previous = make (map [string ]interface {})
753
+ if ! iter .MapScan (previous ) {
754
+ // Cassandra returns the actual row that caused a condition failure, so we should always return
755
+ // from the checks above, but just in case.
756
+ break GetFailureReasonLoop
706
757
}
707
758
}
708
759
760
+ // At this point we only know that the write was not applied.
761
+ // Return the row information returned by Cassandra.
709
762
var columns []string
710
763
for k , v := range previous {
711
764
columns = append (columns , fmt .Sprintf ("%s=%v" , k , v ))
712
765
}
713
-
714
- if execution , ok := previous ["execution" ].(map [string ]interface {}); ok {
715
- // CreateWorkflowExecution failed because it already exists
716
- msg := fmt .Sprintf ("Workflow execution already running. WorkflowId: %v, RunId: %v, rangeID: %v, columns: (%v)" ,
717
- execution ["workflow_id" ], execution ["run_id" ], request .RangeID , strings .Join (columns , "," ))
718
- return nil , & workflow.WorkflowExecutionAlreadyStartedError {
719
- Message : common .StringPtr (msg ),
720
- StartRequestId : common .StringPtr (fmt .Sprintf ("%v" , execution ["create_request_id" ])),
721
- RunId : common .StringPtr (fmt .Sprintf ("%v" , execution ["run_id" ])),
722
- }
723
- }
724
-
725
766
return nil , & ConditionFailedError {
726
767
Msg : fmt .Sprintf ("Failed to create workflow execution. Request RangeID: %v, columns: (%v)" ,
727
768
request .RangeID , strings .Join (columns , "," )),
@@ -910,8 +951,7 @@ func (d *cassandraPersistence) UpdateWorkflowExecution(request *UpdateWorkflowEx
910
951
executionInfo .RunID ,
911
952
defaultVisibilityTimestamp ,
912
953
rowTypeExecutionTaskID ,
913
- request .Condition ,
914
- request .RangeID )
954
+ request .Condition )
915
955
916
956
d .createTransferTasks (batch , request .TransferTasks , executionInfo .DomainID , executionInfo .WorkflowID ,
917
957
executionInfo .RunID , cqlNowTimestamp )
@@ -948,8 +988,22 @@ func (d *cassandraPersistence) UpdateWorkflowExecution(request *UpdateWorkflowEx
948
988
rowTypeExecutionTaskID )
949
989
}
950
990
991
+ // Verifies that the RangeID has not changed
992
+ batch .Query (templateUpdateLeaseQuery ,
993
+ request .RangeID ,
994
+ d .shardID ,
995
+ rowTypeShard ,
996
+ rowTypeShardDomainID ,
997
+ rowTypeShardWorkflowID ,
998
+ rowTypeShardRunID ,
999
+ defaultVisibilityTimestamp ,
1000
+ rowTypeShardTaskID ,
1001
+ request .RangeID ,
1002
+ )
1003
+
951
1004
previous := make (map [string ]interface {})
952
- applied , _ , err := d .session .MapExecuteBatchCAS (batch , previous )
1005
+ applied , iter , err := d .session .MapExecuteBatchCAS (batch , previous )
1006
+ defer iter .Close ()
953
1007
if err != nil {
954
1008
if isTimeoutError (err ) {
955
1009
// Write may have succeeded, but we don't know
@@ -962,23 +1016,45 @@ func (d *cassandraPersistence) UpdateWorkflowExecution(request *UpdateWorkflowEx
962
1016
}
963
1017
964
1018
if ! applied {
965
- if rangeID , ok := previous ["range_id" ].(int64 ); ok && rangeID != request .RangeID {
966
- // UpdateWorkflowExecution failed because rangeID was modified
967
- return & ShardOwnershipLostError {
968
- ShardID : d .shardID ,
969
- Msg : fmt .Sprintf ("Failed to update workflow execution. Request RangeID: %v, Actual RangeID: %v" ,
970
- request .RangeID , rangeID ),
1019
+ // There can be two reasons why the query does not get applied. Either the RangeID has changed, or
1020
+ // the next_event_id check failed. Check the row info returned by Cassandra to figure out which one it is.
1021
+ GetFailureReasonLoop:
1022
+ for {
1023
+ rowType , ok := previous ["type" ].(int )
1024
+ if ! ok {
1025
+ // This should never happen, as all our rows have the type field.
1026
+ break GetFailureReasonLoop
1027
+ }
1028
+
1029
+ if rowType == rowTypeShard {
1030
+ if rangeID , ok := previous ["range_id" ].(int64 ); ok && rangeID != request .RangeID {
1031
+ // UpdateWorkflowExecution failed because rangeID was modified
1032
+ return & ShardOwnershipLostError {
1033
+ ShardID : d .shardID ,
1034
+ Msg : fmt .Sprintf ("Failed to update workflow execution. Request RangeID: %v, Actual RangeID: %v" ,
1035
+ request .RangeID , rangeID ),
1036
+ }
1037
+ }
1038
+ } else {
1039
+ if nextEventID , ok := previous ["next_event_id" ].(int64 ); ok && nextEventID != request .Condition {
1040
+ // CreateWorkflowExecution failed because next event ID is unexpected
1041
+ return & ConditionFailedError {
1042
+ Msg : fmt .Sprintf ("Failed to update workflow execution. Request Condition: %v, Actual Value: %v" ,
1043
+ request .Condition , nextEventID ),
1044
+ }
1045
+ }
971
1046
}
972
- }
973
1047
974
- if nextEventID , ok := previous [ "next_event_id" ].( int64 ); ok && nextEventID != request . Condition {
975
- // CreateWorkflowExecution failed because next event ID is unexpected
976
- return & ConditionFailedError {
977
- Msg : fmt . Sprintf ( "Failed to update workflow execution. Request Condition: %v, Actual Value: %v" ,
978
- request . Condition , nextEventID ),
1048
+ previous = make ( map [ string ] interface {})
1049
+ if ! iter . MapScan ( previous ) {
1050
+ // Cassandra returns the actual row that caused a condition failure, so we should always return
1051
+ // from the checks above, but just in case.
1052
+ break GetFailureReasonLoop
979
1053
}
980
1054
}
981
1055
1056
+ // At this point we only know that the write was not applied.
1057
+ // Return the row information returned by Cassandra.
982
1058
var columns []string
983
1059
for k , v := range previous {
984
1060
columns = append (columns , fmt .Sprintf ("%s=%v" , k , v ))
@@ -1549,8 +1625,7 @@ func (d *cassandraPersistence) updateActivityInfos(batch *gocql.Batch, activityI
1549
1625
runID ,
1550
1626
defaultVisibilityTimestamp ,
1551
1627
rowTypeExecutionTaskID ,
1552
- condition ,
1553
- rangeID )
1628
+ condition )
1554
1629
}
1555
1630
1556
1631
if deleteInfo != nil {
@@ -1563,8 +1638,7 @@ func (d *cassandraPersistence) updateActivityInfos(batch *gocql.Batch, activityI
1563
1638
runID ,
1564
1639
defaultVisibilityTimestamp ,
1565
1640
rowTypeExecutionTaskID ,
1566
- condition ,
1567
- rangeID )
1641
+ condition )
1568
1642
}
1569
1643
}
1570
1644
@@ -1585,8 +1659,7 @@ func (d *cassandraPersistence) updateTimerInfos(batch *gocql.Batch, timerInfos [
1585
1659
runID ,
1586
1660
defaultVisibilityTimestamp ,
1587
1661
rowTypeExecutionTaskID ,
1588
- condition ,
1589
- rangeID )
1662
+ condition )
1590
1663
}
1591
1664
1592
1665
for _ , t := range deleteInfos {
@@ -1599,8 +1672,7 @@ func (d *cassandraPersistence) updateTimerInfos(batch *gocql.Batch, timerInfos [
1599
1672
runID ,
1600
1673
defaultVisibilityTimestamp ,
1601
1674
rowTypeExecutionTaskID ,
1602
- condition ,
1603
- rangeID )
1675
+ condition )
1604
1676
}
1605
1677
}
1606
1678
@@ -1622,8 +1694,7 @@ func (d *cassandraPersistence) updateChildExecutionInfos(batch *gocql.Batch, chi
1622
1694
runID ,
1623
1695
defaultVisibilityTimestamp ,
1624
1696
rowTypeExecutionTaskID ,
1625
- condition ,
1626
- rangeID )
1697
+ condition )
1627
1698
}
1628
1699
1629
1700
// deleteInfo is the initiatedID for ChildInfo being deleted
@@ -1637,8 +1708,7 @@ func (d *cassandraPersistence) updateChildExecutionInfos(batch *gocql.Batch, chi
1637
1708
runID ,
1638
1709
defaultVisibilityTimestamp ,
1639
1710
rowTypeExecutionTaskID ,
1640
- condition ,
1641
- rangeID )
1711
+ condition )
1642
1712
}
1643
1713
}
1644
1714
@@ -1657,8 +1727,7 @@ func (d *cassandraPersistence) updateRequestCancelInfos(batch *gocql.Batch, requ
1657
1727
runID ,
1658
1728
defaultVisibilityTimestamp ,
1659
1729
rowTypeExecutionTaskID ,
1660
- condition ,
1661
- rangeID )
1730
+ condition )
1662
1731
}
1663
1732
1664
1733
// deleteInfo is the initiatedID for RequestCancelInfo being deleted
@@ -1672,8 +1741,7 @@ func (d *cassandraPersistence) updateRequestCancelInfos(batch *gocql.Batch, requ
1672
1741
runID ,
1673
1742
defaultVisibilityTimestamp ,
1674
1743
rowTypeExecutionTaskID ,
1675
- condition ,
1676
- rangeID )
1744
+ condition )
1677
1745
}
1678
1746
}
1679
1747
0 commit comments