Skip to content

Commit a89819b

Browse files
authored
Return activity timeouts along with PollForActivityTask response (#167)
* Return activity timeouts along with PollForActivityTask response * Also include scheduledTimestamp and startedTimestamp with the response
1 parent aab5a6f commit a89819b

File tree

4 files changed

+238
-10
lines changed

4 files changed

+238
-10
lines changed

.gen/go/shared/shared.go

Lines changed: 200 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

idl/github.com/uber/cadence/shared.thrift

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -555,12 +555,17 @@ struct PollForActivityTaskRequest {
555555
}
556556

557557
struct PollForActivityTaskResponse {
558-
10: optional binary taskToken
559-
20: optional WorkflowExecution workflowExecution
560-
30: optional string activityId
561-
40: optional ActivityType activityType
562-
50: optional binary input
563-
60: optional i64 (js.type = "Long") startedEventId
558+
10: optional binary taskToken
559+
20: optional WorkflowExecution workflowExecution
560+
30: optional string activityId
561+
40: optional ActivityType activityType
562+
50: optional binary input
563+
60: optional i64 (js.type = "Long") startedEventId
564+
70: optional i64 (js.type = "Long") scheduledTimestamp
565+
80: optional i32 scheduleToCloseTimeoutSeconds
566+
90: optional i64 (js.type = "Long") startedTimestamp
567+
100: optional i32 startToCloseTimeoutSeconds
568+
110: optional i32 heartbeatTimeoutSeconds
564569
}
565570

566571
struct RecordActivityTaskHeartbeatRequest {

service/matching/matchingEngine.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -361,6 +361,11 @@ func (e *matchingEngineImpl) createPollForActivityTaskResponse(context *taskCont
361361
response.Input = attributes.GetInput()
362362
response.StartedEventId = common.Int64Ptr(startedEvent.GetEventId())
363363
response.WorkflowExecution = workflowExecutionPtr(context.workflowExecution)
364+
response.ScheduledTimestamp = common.Int64Ptr(scheduledEvent.GetTimestamp())
365+
response.ScheduleToCloseTimeoutSeconds = common.Int32Ptr(attributes.GetScheduleToCloseTimeoutSeconds())
366+
response.StartedTimestamp = common.Int64Ptr(startedEvent.GetTimestamp())
367+
response.StartToCloseTimeoutSeconds = common.Int32Ptr(attributes.GetStartToCloseTimeoutSeconds())
368+
response.HeartbeatTimeoutSeconds = common.Int32Ptr(attributes.GetHeartbeatTimeoutSeconds())
364369

365370
token := &common.TaskToken{
366371
DomainID: task.DomainID,

service/matching/matchingEngine_test.go

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -297,10 +297,13 @@ func (s *matchingEngineSuite) TestAddThenConsumeActivities() {
297297
return &gohistory.RecordActivityTaskStartedResponse{
298298
ScheduledEvent: newActivityTaskScheduledEvent(*taskRequest.ScheduleId, 0,
299299
&workflow.ScheduleActivityTaskDecisionAttributes{
300-
ActivityId: &activityID,
301-
TaskList: &workflow.TaskList{Name: taskList.Name},
302-
ActivityType: activityType,
303-
Input: activityInput,
300+
ActivityId: &activityID,
301+
TaskList: &workflow.TaskList{Name: taskList.Name},
302+
ActivityType: activityType,
303+
Input: activityInput,
304+
ScheduleToCloseTimeoutSeconds: common.Int32Ptr(100),
305+
StartToCloseTimeoutSeconds: common.Int32Ptr(50),
306+
HeartbeatTimeoutSeconds: common.Int32Ptr(10),
304307
}),
305308
StartedEvent: newActivityTaskStartedEvent(startedID, 0, &workflow.PollForActivityTaskRequest{
306309
TaskList: &workflow.TaskList{Name: taskList.Name},
@@ -329,6 +332,11 @@ func (s *matchingEngineSuite) TestAddThenConsumeActivities() {
329332
s.EqualValues(activityInput, result.Input)
330333
s.EqualValues(startedID, *result.StartedEventId)
331334
s.EqualValues(workflowExecution, *result.WorkflowExecution)
335+
s.Equal(true, validateTimeRange(time.Unix(0, result.GetScheduledTimestamp()), time.Minute))
336+
s.Equal(int32(100), result.GetScheduleToCloseTimeoutSeconds())
337+
s.Equal(true, validateTimeRange(time.Unix(0, result.GetStartedTimestamp()), time.Minute))
338+
s.Equal(int32(50), result.GetStartToCloseTimeoutSeconds())
339+
s.Equal(int32(10), result.GetHeartbeatTimeoutSeconds())
332340
token := &common.TaskToken{
333341
DomainID: domainID,
334342
WorkflowID: workflowID,
@@ -1320,3 +1328,13 @@ func (m *testTaskManager) String() string {
13201328
}
13211329
return result
13221330
}
1331+
1332+
func validateTimeRange(t time.Time, expectedDuration time.Duration) bool {
1333+
currentTime := time.Now()
1334+
diff := time.Duration(currentTime.UnixNano() - t.UnixNano())
1335+
if diff > expectedDuration {
1336+
log.Infof("Current time: %v, Application time: %v, Differenrce: %v", currentTime, t, diff)
1337+
return false
1338+
}
1339+
return true
1340+
}

0 commit comments

Comments
 (0)