Skip to content

Commit 82170cd

Browse files
committed
api-test executor define add snyc.map to store running task
1 parent be514e5 commit 82170cd

File tree

3 files changed

+126
-18
lines changed

3 files changed

+126
-18
lines changed

modules/pipeline/pipengine/actionexecutor/plugins/apitest/apitest.go

+28-8
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ package apitest
1717
import (
1818
"context"
1919
"fmt"
20+
"sync"
2021

2122
"github.com/sirupsen/logrus"
2223

@@ -30,9 +31,10 @@ import (
3031
var Kind = types.Kind(spec.PipelineTaskExecutorKindAPITest)
3132

3233
type define struct {
33-
name types.Name
34-
options map[string]string
35-
dbClient *dbclient.Client
34+
name types.Name
35+
options map[string]string
36+
dbClient *dbclient.Client
37+
runningApis sync.Map
3638
}
3739

3840
func (d *define) Kind() types.Kind { return Kind }
@@ -46,7 +48,11 @@ func (d *define) Exist(ctx context.Context, task *spec.PipelineTask) (created bo
4648
case status == apistructs.PipelineStatusCreated:
4749
return true, false, nil
4850
case status == apistructs.PipelineStatusQueue, status == apistructs.PipelineStatusRunning:
49-
return true, true, nil
51+
// if apitest task is not procesing, should make status-started false
52+
if _, alreadyProcessing := d.runningApis.Load(d.makeRunningApiKey(task)); alreadyProcessing {
53+
return true, true, nil
54+
}
55+
return true, false, nil
5056
case status.IsEndStatus():
5157
return true, true, nil
5258
default:
@@ -61,6 +67,10 @@ func (d *define) Create(ctx context.Context, task *spec.PipelineTask) (interface
6167
func (d *define) Start(ctx context.Context, task *spec.PipelineTask) (interface{}, error) {
6268

6369
go func(ctx context.Context, task *spec.PipelineTask) {
70+
if _, alreadyProcessing := d.runningApis.LoadOrStore(d.makeRunningApiKey(task), task); alreadyProcessing {
71+
logrus.Warnf("apitest: task: %d already processing", task.ID)
72+
return
73+
}
6474
executorDoneCh, ok := ctx.Value(spec.MakeTaskExecutorCtxKey(task)).(chan interface{})
6575
if !ok {
6676
logrus.Warnf("apitest: failed to get executor channel, pipelineID: %d, taskID: %d", task.PipelineID, task.ID)
@@ -75,6 +85,7 @@ func (d *define) Start(ctx context.Context, task *spec.PipelineTask) (interface{
7585
if executorDoneCh != nil {
7686
executorDoneCh <- apistructs.PipelineStatusDesc{Status: status}
7787
}
88+
d.runningApis.Delete(d.makeRunningApiKey(task))
7889
}()
7990

8091
logic.Do(ctx, task)
@@ -112,7 +123,7 @@ func (d *define) Status(ctx context.Context, task *spec.PipelineTask) (apistruct
112123
return apistructs.PipelineStatusDesc{Status: task.Status}, nil
113124
}
114125

115-
created, _, err := d.Exist(ctx, task)
126+
created, started, err := d.Exist(ctx, task)
116127
if err != nil {
117128
return apistructs.PipelineStatusDesc{}, err
118129
}
@@ -121,6 +132,10 @@ func (d *define) Status(ctx context.Context, task *spec.PipelineTask) (apistruct
121132
return apistructs.PipelineStatusDesc{Status: apistructs.PipelineStatusAnalyzed}, nil
122133
}
123134

135+
if !started && len(latestTask.Result.Metadata) == 0 {
136+
return apistructs.PipelineStatusDesc{Status: apistructs.PipelineStatusBorn}, nil
137+
}
138+
124139
// status according to api success or not
125140
meta := latestTask.Result.Metadata
126141
for _, metaField := range meta {
@@ -154,16 +169,21 @@ func (d *define) BatchDelete(ctx context.Context, actions []*spec.PipelineTask)
154169
return nil, nil
155170
}
156171

172+
func (d *define) makeRunningApiKey(task *spec.PipelineTask) string {
173+
return fmt.Sprintf("%d-%d", task.PipelineID, task.ID)
174+
}
175+
157176
func init() {
158177
types.MustRegister(Kind, func(name types.Name, options map[string]string) (types.ActionExecutor, error) {
159178
dbClient, err := dbclient.New()
160179
if err != nil {
161180
return nil, fmt.Errorf("failed to init dbclient, err: %v", err)
162181
}
163182
return &define{
164-
name: name,
165-
options: options,
166-
dbClient: dbClient,
183+
name: name,
184+
options: options,
185+
dbClient: dbClient,
186+
runningApis: sync.Map{},
167187
}, nil
168188
})
169189
}

modules/pipeline/pipengine/reconciler/taskrun/taskop/wait.go

+22-10
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ package taskop
1717
import (
1818
"context"
1919
"errors"
20+
"math"
2021
"time"
2122

2223
"github.com/sirupsen/logrus"
@@ -30,6 +31,11 @@ import (
3031

3132
var err4EnableDeclineRatio = errors.New("enable decline ratio")
3233

34+
var (
35+
declineRatio float64 = 1.5
36+
declineLimit time.Duration = 10 * time.Second
37+
)
38+
3339
type wait taskrun.TaskRun
3440

3541
func NewWait(tr *taskrun.TaskRun) *wait {
@@ -46,13 +52,12 @@ func (w *wait) TaskRun() *taskrun.TaskRun {
4652

4753
func (w *wait) Processing() (interface{}, error) {
4854
var (
49-
data interface{}
50-
lastSleepTime float64 = 1
51-
declineRatio = 1.5
52-
declineLimit int = 10
55+
data interface{}
56+
loopedTimes uint64
5357
)
5458

55-
timer := time.NewTimer(time.Duration(lastSleepTime) * time.Second)
59+
timer := time.NewTimer(w.calculateNextLoopTimeDuration(loopedTimes))
60+
defer timer.Stop()
5661
for {
5762
select {
5863
case doneData := <-w.ExecutorDoneCh:
@@ -77,11 +82,9 @@ func (w *wait) Processing() (interface{}, error) {
7782
if statusDesc.Status == apistructs.PipelineStatusUnknown {
7883
logrus.Errorf("[alert] reconciler: pipelineID: %d, task %q wait get status %q, retry", w.P.ID, w.Task.Name, apistructs.PipelineStatusUnknown)
7984
}
80-
lastSleepTime = lastSleepTime * declineRatio
81-
if lastSleepTime > float64(declineLimit) {
82-
lastSleepTime = float64(declineLimit)
83-
}
84-
timer.Reset(time.Duration(lastSleepTime) * time.Second)
85+
86+
loopedTimes++
87+
timer.Reset(w.calculateNextLoopTimeDuration(loopedTimes))
8588
}
8689
}
8790
}
@@ -174,3 +177,12 @@ func (w *wait) TuneTriggers() taskrun.TaskOpTuneTriggers {
174177
AfterProcessing: aoptypes.TuneTriggerTaskAfterWait,
175178
}
176179
}
180+
181+
func (w *wait) calculateNextLoopTimeDuration(loopedTimes uint64) time.Duration {
182+
lastSleepTime := time.Second
183+
lastSleepTime = time.Duration(float64(lastSleepTime) * math.Pow(declineRatio, float64(loopedTimes)))
184+
if lastSleepTime > declineLimit {
185+
return declineLimit
186+
}
187+
return lastSleepTime
188+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
// Copyright (c) 2021 Terminus, Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package taskop
16+
17+
import (
18+
"testing"
19+
20+
"github.com/stretchr/testify/assert"
21+
22+
"github.com/erda-project/erda/modules/pipeline/pipengine/reconciler/taskrun"
23+
)
24+
25+
func TestCalculateNextLoopTimeDuration(t *testing.T) {
26+
tt := []struct {
27+
loopedTimes uint64
28+
want string
29+
}{
30+
{
31+
loopedTimes: 0,
32+
want: "1s",
33+
},
34+
{
35+
loopedTimes: 1,
36+
want: "1.5s",
37+
},
38+
{
39+
loopedTimes: 2,
40+
want: "2.25s",
41+
},
42+
{
43+
loopedTimes: 3,
44+
want: "3.375s",
45+
},
46+
{
47+
loopedTimes: 4,
48+
want: "5.0625s",
49+
},
50+
{
51+
loopedTimes: 5,
52+
want: "7.59375s",
53+
},
54+
{
55+
loopedTimes: 6,
56+
want: "10s",
57+
},
58+
{
59+
loopedTimes: 7,
60+
want: "10s",
61+
},
62+
{
63+
loopedTimes: 8,
64+
want: "10s",
65+
},
66+
{
67+
loopedTimes: 9,
68+
want: "10s",
69+
},
70+
}
71+
72+
w := NewWait(&taskrun.TaskRun{})
73+
for i := range tt {
74+
assert.Equal(t, tt[i].want, w.calculateNextLoopTimeDuration(tt[i].loopedTimes).String())
75+
}
76+
}

0 commit comments

Comments
 (0)