Skip to content

Commit 48e567c

Browse files
committed
apitest action don't set task pointers value
1 parent a1a45fd commit 48e567c

File tree

4 files changed

+60
-18
lines changed

4 files changed

+60
-18
lines changed

modules/pipeline/pipengine/actionexecutor/plugins/apitest/apitest.go

+42-4
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@ import (
1818
"context"
1919
"fmt"
2020

21+
"github.com/sirupsen/logrus"
22+
2123
"github.com/erda-project/erda/apistructs"
2224
"github.com/erda-project/erda/modules/pipeline/dbclient"
2325
"github.com/erda-project/erda/modules/pipeline/pipengine/actionexecutor/plugins/apitest/logic"
@@ -57,7 +59,41 @@ func (d *define) Create(ctx context.Context, task *spec.PipelineTask) (interface
5759
}
5860

5961
func (d *define) Start(ctx context.Context, task *spec.PipelineTask) (interface{}, error) {
60-
logic.Do(ctx, task)
62+
63+
go func(ctx context.Context, task *spec.PipelineTask) {
64+
executorDoneCh, ok := ctx.Value(spec.MakeTaskExecutorCtxKey(task)).(chan interface{})
65+
if !ok {
66+
logrus.Warnf("apitest: failed to get executor channel, pipelineID: %d, taskID: %d", task.PipelineID, task.ID)
67+
}
68+
69+
var status = apistructs.PipelineStatusFailed
70+
defer func() {
71+
if r := recover(); r != nil {
72+
logrus.Errorf("api-test logic do panic recover:%s", r)
73+
}
74+
// if executor chan is nil, task framework can loop query meta get status
75+
if executorDoneCh != nil {
76+
executorDoneCh <- apistructs.PipelineStatusDesc{Status: status}
77+
}
78+
}()
79+
80+
logic.Do(ctx, task)
81+
82+
latestTask, err := d.dbClient.GetPipelineTask(task.ID)
83+
if err != nil {
84+
logrus.Errorf("failed to query latest task, err: %v \n", err)
85+
return
86+
}
87+
88+
meta := latestTask.Result.Metadata
89+
for _, metaField := range meta {
90+
if metaField.Name == logic.MetaKeyResult {
91+
if metaField.Value == logic.ResultSuccess {
92+
status = apistructs.PipelineStatusSuccess
93+
}
94+
}
95+
}
96+
}(ctx, task)
6197
return nil, nil
6298
}
6399

@@ -70,7 +106,7 @@ func (d *define) Status(ctx context.Context, task *spec.PipelineTask) (apistruct
70106
if err != nil {
71107
return apistructs.PipelineStatusDesc{}, fmt.Errorf("failed to query latest task, err: %v", err)
72108
}
73-
*task = latestTask
109+
//*task = latestTask
74110

75111
if task.Status.IsEndStatus() {
76112
return apistructs.PipelineStatusDesc{Status: task.Status}, nil
@@ -92,12 +128,14 @@ func (d *define) Status(ctx context.Context, task *spec.PipelineTask) (apistruct
92128
if metaField.Value == logic.ResultSuccess {
93129
return apistructs.PipelineStatusDesc{Status: apistructs.PipelineStatusSuccess}, nil
94130
}
95-
return apistructs.PipelineStatusDesc{Status: apistructs.PipelineStatusFailed}, nil
131+
if metaField.Value == logic.ResultFailed {
132+
return apistructs.PipelineStatusDesc{Status: apistructs.PipelineStatusFailed}, nil
133+
}
96134
}
97135
}
98136

99137
// return created status to do start step
100-
return apistructs.PipelineStatusDesc{Status: apistructs.PipelineStatusCreated}, nil
138+
return apistructs.PipelineStatusDesc{Status: apistructs.PipelineStatusRunning}, nil
101139
}
102140

103141
func (d *define) Inspect(ctx context.Context, task *spec.PipelineTask) (apistructs.TaskInspect, error) {

modules/pipeline/pipengine/actionexecutor/plugins/apitest/logic/meta.go

+12-6
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
"context"
1919
"encoding/json"
2020
"strconv"
21+
"time"
2122

2223
"github.com/erda-project/erda/apistructs"
2324
"github.com/erda-project/erda/modules/actionagent"
@@ -26,6 +27,7 @@ import (
2627
"github.com/erda-project/erda/pkg/apitestsv2"
2728
"github.com/erda-project/erda/pkg/apitestsv2/cookiejar"
2829
"github.com/erda-project/erda/pkg/encoding/jsonparse"
30+
"github.com/erda-project/erda/pkg/loop"
2931
)
3032

3133
const (
@@ -112,11 +114,15 @@ func writeMetaFile(ctx context.Context, task *spec.PipelineTask, meta *Meta) {
112114
cb.PipelineTaskID = task.ID
113115

114116
cbData, _ := json.Marshal(&cb)
115-
err := pipelinefunc.CallbackActionFunc(cbData)
116-
if err != nil {
117-
log.Errorf("failed to callback, err: %v", err)
118-
return
119-
}
120-
117+
// apitest should ensure that callback to pipeline after doing request
118+
_ = loop.New(loop.WithDeclineRatio(2), loop.WithDeclineLimit(10*time.Second)).
119+
Do(func() (bool, error) {
120+
err := pipelinefunc.CallbackActionFunc(cbData)
121+
if err != nil {
122+
log.Errorf("failed to callback, err: %v", err)
123+
return false, err
124+
}
125+
return true, nil
126+
})
121127
return
122128
}

modules/pipeline/pipengine/reconciler/reconcile.go

+1-4
Original file line numberDiff line numberDiff line change
@@ -104,13 +104,10 @@ func (r *Reconciler) reconcile(ctx context.Context, pipelineID uint64) error {
104104
if err != nil {
105105
return
106106
}
107-
108-
executorChan := make(chan interface{})
109-
ctx = context.WithValue(ctx, spec.MakeTaskExecutorCtxKey(task), executorChan)
110107
tr := taskrun.New(ctx, task,
111108
ctx.Value(ctxKeyPipelineExitCh).(chan struct{}), ctx.Value(ctxKeyPipelineExitChCancelFunc).(context.CancelFunc),
112109
r.TaskThrottler, executor, &p, r.bdl, r.dbClient, r.js,
113-
r.actionAgentSvc, r.extMarketSvc, executorChan)
110+
r.actionAgentSvc, r.extMarketSvc)
114111

115112
// tear down task
116113
defer func() {

modules/pipeline/pipengine/reconciler/taskrun/taskrun.go

+5-4
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ type TaskRun struct {
5353
PExitCh <-chan struct{}
5454
PExitChCancel context.CancelFunc
5555
PExit bool
56-
ExecutorDoneCh chan interface{} // executorDoneCh allow action executor return directly
56+
ExecutorDoneCh chan interface{}
5757

5858
// 轮训状态间隔期间可能任务已经是终态,FakeTimeout = true
5959
FakeTimeout bool
@@ -70,10 +70,11 @@ func New(ctx context.Context, task *spec.PipelineTask,
7070
executor types.ActionExecutor, p *spec.Pipeline, bdl *bundle.Bundle, dbClient *dbclient.Client, js jsonstore.JsonStore,
7171
actionAgentSvc *actionagentsvc.ActionAgentSvc,
7272
extMarketSvc *extmarketsvc.ExtMarketSvc,
73-
executorCh chan interface{},
7473
) *TaskRun {
74+
// make executor has buffer, don't block task framework
75+
executorCH := make(chan interface{}, 1)
7576
return &TaskRun{
76-
Ctx: ctx,
77+
Ctx: context.WithValue(ctx, spec.MakeTaskExecutorCtxKey(task), executorCH),
7778
Task: task,
7879
Executor: executor,
7980
Throttler: throttler,
@@ -91,7 +92,7 @@ func New(ctx context.Context, task *spec.PipelineTask,
9192

9293
PExitCh: pExitCh,
9394
PExitChCancel: pExitChCancel,
94-
ExecutorDoneCh: executorCh,
95+
ExecutorDoneCh: executorCH,
9596

9697
ActionAgentSvc: actionAgentSvc,
9798
ExtMarketSvc: extMarketSvc,

0 commit comments

Comments
 (0)