Skip to content

Commit 7f1b7fd

Browse files
kakj-goCounterflowwind
authored andcommitted
task policy (erda-project#2989)
* add pod_name (erda-project#2939) * add pod name and notIn * modify pod_name to component_name * Increase the pipeline task execution strategy, the new strategy can make the task no longer execute but directly take the latest successful result Co-authored-by: panjiayao <[email protected]>
1 parent ec9b67f commit 7f1b7fd

File tree

8 files changed

+486
-0
lines changed

8 files changed

+486
-0
lines changed

apistructs/pipeline_yml.go

+22
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,28 @@ type PipelineYmlAction struct {
106106
Disable bool `json:"disable,omitempty"` // task is disable or enable
107107
Loop *PipelineTaskLoop `json:"loop,omitempty"` // 循环执行
108108
SnippetStages *SnippetStages `json:"snippetStages,omitempty"` // snippetStages snippet 展开
109+
Policy *Policy `json:"policy,omitempty"` // action execution strategy
110+
}
111+
112+
type PolicyType string
113+
114+
// todo add other types of implementation
115+
// try-latest-result (if not exist -> new-run)
116+
// force-latest-result (throw error or wait?)
117+
//
118+
// try-latest-success-result
119+
// force-latest-success-result
120+
//
121+
// new-run (default, can omit)
122+
//
123+
// run-once-from-root-pipeline
124+
const (
125+
NewRunPolicyType PolicyType = "new-run"
126+
TryLatestSuccessResultPolicyType PolicyType = "try-latest-success-result"
127+
)
128+
129+
type Policy struct {
130+
Type PolicyType `json:"type,omitempty"`
109131
}
110132

111133
type SnippetStages struct {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
// Copyright (c) 2021 Terminus, Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package reconciler
16+
17+
import (
18+
"fmt"
19+
20+
"github.com/erda-project/erda/apistructs"
21+
"github.com/erda-project/erda/modules/pipeline/dbclient"
22+
"github.com/erda-project/erda/modules/pipeline/spec"
23+
"github.com/erda-project/erda/pkg/parser/pipelineyml"
24+
)
25+
26+
type PolicyHandlerOptions struct {
27+
dbClient *dbclient.Client
28+
}
29+
30+
type PolicyType interface {
31+
ResetTask(*spec.PipelineTask, PolicyHandlerOptions) (*spec.PipelineTask, error)
32+
}
33+
34+
var policyTypeAdaptor = map[apistructs.PolicyType]PolicyType{
35+
apistructs.TryLatestSuccessResultPolicyType: TryLastSuccessResult{},
36+
apistructs.NewRunPolicyType: NewRun{},
37+
}
38+
39+
type NewRun struct{}
40+
41+
func (run NewRun) ResetTask(task *spec.PipelineTask, options PolicyHandlerOptions) (*spec.PipelineTask, error) {
42+
return task, nil
43+
}
44+
45+
type TryLastSuccessResult struct{}
46+
47+
func (t TryLastSuccessResult) ResetTask(task *spec.PipelineTask, opt PolicyHandlerOptions) (*spec.PipelineTask, error) {
48+
if !task.IsSnippet {
49+
return task, nil
50+
}
51+
52+
if task.Extra.Action.SnippetConfig == nil {
53+
return task, nil
54+
}
55+
56+
ymlName, source := getPipelineSourceAndNameBySnippetConfig(task.Extra.Action.SnippetConfig)
57+
58+
runSuccessPipeline, _, _, _, err := opt.dbClient.PageListPipelines(apistructs.PipelinePageListRequest{
59+
Sources: []apistructs.PipelineSource{source},
60+
YmlNames: []string{ymlName},
61+
Statuses: []string{apistructs.PipelineStatusSuccess.String()},
62+
PageNum: 1,
63+
PageSize: 1,
64+
IncludeSnippet: true,
65+
DescCols: []string{apistructs.PipelinePageListRequestIdColumn},
66+
})
67+
if err != nil {
68+
return task, err
69+
}
70+
if len(runSuccessPipeline) <= 0 {
71+
return task, nil
72+
}
73+
pipeline := runSuccessPipeline[0]
74+
75+
beforeSuccessTask, err := opt.dbClient.GetPipelineTask(pipeline.ParentTaskID)
76+
if err != nil {
77+
return task, err
78+
}
79+
80+
if beforeSuccessTask.ID <= 0 {
81+
return task, nil
82+
}
83+
84+
task.Status = beforeSuccessTask.Status
85+
task.Result = beforeSuccessTask.Result
86+
task.IsSnippet = beforeSuccessTask.IsSnippet
87+
task.SnippetPipelineDetail = beforeSuccessTask.SnippetPipelineDetail
88+
task.TimeBegin = beforeSuccessTask.TimeBegin
89+
task.TimeEnd = beforeSuccessTask.TimeEnd
90+
task.CostTimeSec = beforeSuccessTask.CostTimeSec
91+
task.QueueTimeSec = beforeSuccessTask.QueueTimeSec
92+
task.SnippetPipelineID = beforeSuccessTask.SnippetPipelineID
93+
if task.Extra.Action.Policy != nil {
94+
task.Extra.CurrentPolicy = apistructs.Policy{
95+
Type: task.Extra.Action.Policy.Type,
96+
}
97+
}
98+
return task, nil
99+
}
100+
101+
func getPipelineSourceAndNameBySnippetConfig(snippetConfig *pipelineyml.SnippetConfig) (ymlName string, sources apistructs.PipelineSource) {
102+
return snippetConfig.Name, apistructs.PipelineSource(snippetConfig.Source)
103+
}
104+
105+
func (r *Reconciler) adaptPolicy(task *spec.PipelineTask) (result *spec.PipelineTask, err error) {
106+
if task == nil {
107+
return task, fmt.Errorf("task was empty")
108+
}
109+
if task.Extra.Action.Policy == nil {
110+
return task, nil
111+
}
112+
113+
handler := policyTypeAdaptor[task.Extra.Action.Policy.Type]
114+
if handler == nil {
115+
return task, nil
116+
}
117+
118+
opt := PolicyHandlerOptions{
119+
dbClient: r.dbClient,
120+
}
121+
122+
return handler.ResetTask(task, opt)
123+
}

0 commit comments

Comments
 (0)