Skip to content

Commit e047ef3

Browse files
authored
[Wf-Diagnostics] retrieve workflow execution history within issue identification activity (#6607)
1 parent bdce46c commit e047ef3

File tree

5 files changed

+18
-45
lines changed

5 files changed

+18
-45
lines changed

service/worker/diagnostics/activities.go

Lines changed: 10 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -39,30 +39,26 @@ const (
3939
WfDiagnosticsAppName = "workflow-diagnostics"
4040
)
4141

42-
type retrieveExecutionHistoryInputParams struct {
43-
Domain string
42+
type identifyIssuesParams struct {
4443
Execution *types.WorkflowExecution
44+
Domain string
4545
}
4646

47-
func (w *dw) retrieveExecutionHistory(ctx context.Context, info retrieveExecutionHistoryInputParams) (*types.GetWorkflowExecutionHistoryResponse, error) {
47+
func (w *dw) identifyIssues(ctx context.Context, info identifyIssuesParams) ([]invariant.InvariantCheckResult, error) {
48+
result := make([]invariant.InvariantCheckResult, 0)
49+
4850
frontendClient := w.clientBean.GetFrontendClient()
49-
return frontendClient.GetWorkflowExecutionHistory(ctx, &types.GetWorkflowExecutionHistoryRequest{
51+
history, err := frontendClient.GetWorkflowExecutionHistory(ctx, &types.GetWorkflowExecutionHistoryRequest{
5052
Domain: info.Domain,
5153
Execution: info.Execution,
5254
})
53-
}
54-
55-
type identifyIssuesParams struct {
56-
History *types.GetWorkflowExecutionHistoryResponse
57-
Domain string
58-
}
59-
60-
func (w *dw) identifyIssues(ctx context.Context, info identifyIssuesParams) ([]invariant.InvariantCheckResult, error) {
61-
result := make([]invariant.InvariantCheckResult, 0)
55+
if err != nil {
56+
return nil, err
57+
}
6258

6359
for _, inv := range w.invariants {
6460
issues, err := inv.Check(ctx, invariant.InvariantCheckInput{
65-
WorkflowExecutionHistory: info.History,
61+
WorkflowExecutionHistory: history,
6662
Domain: info.Domain,
6763
})
6864
if err != nil {

service/worker/diagnostics/activities_test.go

Lines changed: 4 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -48,19 +48,6 @@ const (
4848
timeUnit = time.Second
4949
)
5050

51-
func Test__retrieveExecutionHistory(t *testing.T) {
52-
dwtest := testDiagnosticWorkflow(t)
53-
result, err := dwtest.retrieveExecutionHistory(context.Background(), retrieveExecutionHistoryInputParams{
54-
Domain: "test",
55-
Execution: &types.WorkflowExecution{
56-
WorkflowID: "123",
57-
RunID: "abc",
58-
},
59-
})
60-
require.NoError(t, err)
61-
require.Equal(t, testWorkflowExecutionHistoryResponse(), result)
62-
}
63-
6451
func Test__identifyIssues(t *testing.T) {
6552
dwtest := testDiagnosticWorkflow(t)
6653
actMetadata := failure.FailureMetadata{
@@ -100,7 +87,10 @@ func Test__identifyIssues(t *testing.T) {
10087
Metadata: retryMetadataInBytes,
10188
},
10289
}
103-
result, err := dwtest.identifyIssues(context.Background(), identifyIssuesParams{History: testWorkflowExecutionHistoryResponse()})
90+
result, err := dwtest.identifyIssues(context.Background(), identifyIssuesParams{Execution: &types.WorkflowExecution{
91+
WorkflowID: "123",
92+
RunID: "abc",
93+
}})
10494
require.NoError(t, err)
10595
require.Equal(t, expectedResult, result)
10696
}

service/worker/diagnostics/module.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,6 @@ func (w *dw) Start() error {
9191
newWorker := worker.New(w.svcClient, common.SystemLocalDomainName, tasklist, workerOpts)
9292
newWorker.RegisterWorkflowWithOptions(w.DiagnosticsWorkflow, workflow.RegisterOptions{Name: diagnosticsWorkflow})
9393
newWorker.RegisterWorkflowWithOptions(w.DiagnosticsStarterWorkflow, workflow.RegisterOptions{Name: diagnosticsStarterWorkflow})
94-
newWorker.RegisterActivityWithOptions(w.retrieveExecutionHistory, activity.RegisterOptions{Name: retrieveWfExecutionHistoryActivity})
9594
newWorker.RegisterActivityWithOptions(w.identifyIssues, activity.RegisterOptions{Name: identifyIssuesActivity})
9695
newWorker.RegisterActivityWithOptions(w.rootCauseIssues, activity.RegisterOptions{Name: rootCauseIssuesActivity})
9796
newWorker.RegisterActivityWithOptions(w.emitUsageLogs, activity.RegisterOptions{Name: emitUsageLogsActivity})

service/worker/diagnostics/workflow.go

Lines changed: 4 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -125,23 +125,14 @@ func (w *dw) DiagnosticsWorkflow(ctx workflow.Context, params DiagnosticsWorkflo
125125
StartToCloseTimeout: time.Second * 5,
126126
}
127127
activityCtx := workflow.WithActivityOptions(ctx, activityOptions)
128+
timeoutsResult.Runbooks = []string{linkToTimeoutsRunbook}
128129

129-
var wfExecutionHistory *types.GetWorkflowExecutionHistoryResponse
130-
err := workflow.ExecuteActivity(activityCtx, w.retrieveExecutionHistory, retrieveExecutionHistoryInputParams{
131-
Domain: params.Domain,
130+
err := workflow.ExecuteActivity(activityCtx, w.identifyIssues, identifyIssuesParams{
132131
Execution: &types.WorkflowExecution{
133132
WorkflowID: params.WorkflowID,
134133
RunID: params.RunID,
135-
}}).Get(ctx, &wfExecutionHistory)
136-
if err != nil {
137-
return nil, fmt.Errorf("RetrieveExecutionHistory: %w", err)
138-
}
139-
140-
timeoutsResult.Runbooks = []string{linkToTimeoutsRunbook}
141-
142-
err = workflow.ExecuteActivity(activityCtx, w.identifyIssues, identifyIssuesParams{
143-
History: wfExecutionHistory,
144-
Domain: params.Domain,
134+
},
135+
Domain: params.Domain,
145136
}).Get(ctx, &checkResult)
146137
if err != nil {
147138
return nil, fmt.Errorf("IdentifyIssues: %w", err)

service/worker/diagnostics/workflow_test.go

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,6 @@ func (s *diagnosticsWorkflowTestSuite) SetupTest() {
7575

7676
s.workflowEnv.RegisterWorkflowWithOptions(s.dw.DiagnosticsStarterWorkflow, workflow.RegisterOptions{Name: diagnosticsStarterWorkflow})
7777
s.workflowEnv.RegisterWorkflowWithOptions(s.dw.DiagnosticsWorkflow, workflow.RegisterOptions{Name: diagnosticsWorkflow})
78-
s.workflowEnv.RegisterActivityWithOptions(s.dw.retrieveExecutionHistory, activity.RegisterOptions{Name: retrieveWfExecutionHistoryActivity})
7978
s.workflowEnv.RegisterActivityWithOptions(s.dw.identifyIssues, activity.RegisterOptions{Name: identifyIssuesActivity})
8079
s.workflowEnv.RegisterActivityWithOptions(s.dw.rootCauseIssues, activity.RegisterOptions{Name: rootCauseIssuesActivity})
8180
s.workflowEnv.RegisterActivityWithOptions(s.dw.emitUsageLogs, activity.RegisterOptions{Name: emitUsageLogsActivity})
@@ -133,7 +132,6 @@ func (s *diagnosticsWorkflowTestSuite) TestWorkflow() {
133132
PollersMetadata: &timeout.PollersMetadata{TaskListBacklog: taskListBacklog},
134133
},
135134
}
136-
s.workflowEnv.OnActivity(retrieveWfExecutionHistoryActivity, mock.Anything, mock.Anything).Return(nil, nil)
137135
s.workflowEnv.OnActivity(identifyIssuesActivity, mock.Anything, mock.Anything).Return(issues, nil)
138136
s.workflowEnv.OnActivity(rootCauseIssuesActivity, mock.Anything, mock.Anything).Return(rootCause, nil)
139137
s.workflowEnv.OnActivity(emitUsageLogsActivity, mock.Anything, mock.Anything).Return(nil)
@@ -157,7 +155,6 @@ func (s *diagnosticsWorkflowTestSuite) TestWorkflow_Error() {
157155
}
158156
mockErr := errors.New("mockErr")
159157
errExpected := fmt.Errorf("IdentifyIssues: %w", mockErr)
160-
s.workflowEnv.OnActivity(retrieveWfExecutionHistoryActivity, mock.Anything, mock.Anything).Return(nil, nil)
161158
s.workflowEnv.OnActivity(identifyIssuesActivity, mock.Anything, mock.Anything).Return(nil, mockErr)
162159
s.workflowEnv.ExecuteWorkflow(diagnosticsWorkflow, params)
163160
s.True(s.workflowEnv.IsWorkflowCompleted())

0 commit comments

Comments
 (0)