Skip to content

Commit c6b21ff

Browse files
author
Tamer Eldeeb
authored
Correctly stop shard on rangeID update failure (#168)
This change whitelists error codes that do not trigger rangeID updates for writes. If the rangeID update fail, we fallback to immediately stopping the shard. This was done incorrectly before. Also, add more logging.
1 parent a89819b commit c6b21ff

File tree

3 files changed

+21
-5
lines changed

3 files changed

+21
-5
lines changed

service/history/historyEngine_test.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -198,6 +198,7 @@ func (s *engineSuite) TestRespondDecisionTaskCompletedUpdateExecutionFailed() {
198198
s.mockExecutionMgr.On("GetWorkflowExecution", mock.Anything).Return(gwmsResponse, nil).Once()
199199
s.mockHistoryMgr.On("AppendHistoryEvents", mock.Anything).Return(nil).Once()
200200
s.mockExecutionMgr.On("UpdateWorkflowExecution", mock.Anything).Return(errors.New("FAILED")).Once()
201+
s.mockShardManager.On("UpdateShard", mock.Anything).Return(nil).Once()
201202

202203
err := s.mockHistoryEngine.RespondDecisionTaskCompleted(&history.RespondDecisionTaskCompletedRequest{
203204
DomainUUID: common.StringPtr(domainID),
@@ -872,6 +873,7 @@ func (s *engineSuite) TestRespondActivityTaskCompletedUpdateExecutionFailed() {
872873
s.mockExecutionMgr.On("GetWorkflowExecution", mock.Anything).Return(gwmsResponse, nil).Once()
873874
s.mockHistoryMgr.On("AppendHistoryEvents", mock.Anything).Return(nil).Once()
874875
s.mockExecutionMgr.On("UpdateWorkflowExecution", mock.Anything).Return(errors.New("FAILED")).Once()
876+
s.mockShardManager.On("UpdateShard", mock.Anything).Return(nil).Once()
875877

876878
err := s.mockHistoryEngine.RespondActivityTaskCompleted(&history.RespondActivityTaskCompletedRequest{
877879
DomainUUID: common.StringPtr(domainID),
@@ -1246,6 +1248,7 @@ func (s *engineSuite) TestRespondActivityTaskFailedUpdateExecutionFailed() {
12461248
s.mockExecutionMgr.On("GetWorkflowExecution", mock.Anything).Return(gwmsResponse, nil).Once()
12471249
s.mockHistoryMgr.On("AppendHistoryEvents", mock.Anything).Return(nil).Once()
12481250
s.mockExecutionMgr.On("UpdateWorkflowExecution", mock.Anything).Return(errors.New("FAILED")).Once()
1251+
s.mockShardManager.On("UpdateShard", mock.Anything).Return(nil).Once()
12491252

12501253
err := s.mockHistoryEngine.RespondActivityTaskFailed(&history.RespondActivityTaskFailedRequest{
12511254
DomainUUID: common.StringPtr(domainID),

service/history/loggingHelpers.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,7 @@ const (
104104
tagValueStoreOperationGetWorkflowExecution = "get-wf-execution"
105105
tagValueStoreOperationUpdateWorkflowExecution = "update-wf-execution"
106106
tagValueStoreOperationDeleteWorkflowExecution = "delete-wf-execution"
107+
tagValueStoreOperationUpdateShard = "update-shard"
107108
)
108109

109110
func logInvalidHistoryActionEvent(logger bark.Logger, action string, eventID int64, state string) {

service/history/shardContext.go

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package history
22

33
import (
4+
"fmt"
45
"sync"
56
"sync/atomic"
67

@@ -43,7 +44,7 @@ type (
4344
timerSequenceNumber int64
4445
rangeSize uint
4546
closeCh chan<- int
46-
isClosed int32
47+
isClosed bool
4748
logger bark.Logger
4849
metricsClient metrics.Client
4950

@@ -132,6 +133,7 @@ func (s *shardContextImpl) CreateWorkflowExecution(request *persistence.CreateWo
132133
if err != nil {
133134
return nil, err
134135
}
136+
s.logger.Debugf("Assigning transfer task ID: %v", id)
135137
task.SetTaskID(id)
136138
transferMaxReadLevel = id
137139
}
@@ -155,7 +157,8 @@ Create_Loop:
155157
s.closeShard()
156158
}
157159
}
158-
case *shared.InternalServiceError, *persistence.TimeoutError:
160+
case *shared.WorkflowExecutionAlreadyStartedError:
161+
default:
159162
{
160163
// We have no idea if the write failed or will eventually make it to
161164
// persistence. Increment RangeID to guarantee that subsequent reads
@@ -191,6 +194,7 @@ func (s *shardContextImpl) UpdateWorkflowExecution(request *persistence.UpdateWo
191194
if err != nil {
192195
return err
193196
}
197+
s.logger.Debugf("Assigning transfer task ID: %v", id)
194198
task.SetTaskID(id)
195199
transferMaxReadLevel = id
196200
}
@@ -201,6 +205,7 @@ func (s *shardContextImpl) UpdateWorkflowExecution(request *persistence.UpdateWo
201205
if err != nil {
202206
return err
203207
}
208+
s.logger.Debugf("Assigning transfer task ID: %v", id)
204209
task.SetTaskID(id)
205210
transferMaxReadLevel = id
206211
}
@@ -225,7 +230,8 @@ Update_Loop:
225230
s.closeShard()
226231
}
227232
}
228-
case *shared.InternalServiceError, *persistence.TimeoutError:
233+
case *persistence.ConditionFailedError:
234+
default:
229235
{
230236
// We have no idea if the write failed or will eventually make it to
231237
// persistence. Increment RangeID to guarantee that subsequent reads
@@ -277,11 +283,15 @@ func (s *shardContextImpl) getRangeID() int64 {
277283
}
278284

279285
func (s *shardContextImpl) closeShard() {
280-
if !atomic.CompareAndSwapInt32(&s.isClosed, 0, 1) {
286+
if s.isClosed {
281287
return
282288
}
283289

284-
s.rangeID = -1 // fails any writes that may start after this point.
290+
s.isClosed = true
291+
292+
// fails any writes that may start after this point.
293+
s.shardInfo.RangeID = -1
294+
atomic.StoreInt64(&s.rangeID, s.shardInfo.RangeID)
285295

286296
if s.closeCh != nil {
287297
// This is the channel passed in by shard controller to monitor if a shard needs to be unloaded
@@ -320,6 +330,8 @@ func (s *shardContextImpl) renewRangeLocked(isStealing bool) error {
320330
ShardInfo: updatedShardInfo,
321331
PreviousRangeID: s.shardInfo.RangeID})
322332
if err != nil {
333+
logPersistantStoreErrorEvent(s.logger, tagValueStoreOperationUpdateShard, err,
334+
fmt.Sprintf("{RangeID: %v}", s.shardInfo.RangeID))
323335
// Shard is stolen, trigger history engine shutdown
324336
if _, ok := err.(*persistence.ShardOwnershipLostError); ok {
325337
s.closeShard()

0 commit comments

Comments
 (0)