Skip to content

Commit 5502e72

Browse files
committed
send pop out pipeline id to etcd, and watch pop out in master
1 parent 0845947 commit 5502e72

File tree

4 files changed

+58
-8
lines changed

4 files changed

+58
-8
lines changed

modules/pipeline/endpoints/pipeline.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -262,7 +262,7 @@ func (e *Endpoints) pipelineCancel(ctx context.Context, r *http.Request, vars ma
262262
}
263263

264264
if e.reconciler.QueueManager != nil {
265-
e.reconciler.QueueManager.PopOutPipelineFromQueue(pipelineID)
265+
e.reconciler.QueueManager.SendPopOutPipelineIDToEtcd(pipelineID)
266266
}
267267

268268
return httpserver.OkResp(nil)

modules/pipeline/pipengine/reconciler/before_listen.go

+3
Original file line numberDiff line numberDiff line change
@@ -38,5 +38,8 @@ func (r *Reconciler) beforeListen(ctx context.Context) error {
3838
go func() {
3939
r.QueueManager.ListenUpdatePriorityPipelineIDsFromEtcd(ctx)
4040
}()
41+
go func() {
42+
r.QueueManager.ListenPopOutPipelineIDFromEtcd(ctx)
43+
}()
4144
return nil
4245
}

modules/pipeline/pipengine/reconciler/queuemanage/manager/queue.go

+52-7
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,9 @@ import (
3131
)
3232

3333
const (
34-
etcdQueueWatchPrefix = "/devops/pipeline/queue_manager/actions/update/"
35-
etcdQueuePipelineWatchPrefix = "/devops/pipeline/queue_manager/actions/batch-update/"
34+
etcdQueueWatchPrefix = "/devops/pipeline/queue_manager/actions/update/"
35+
etcdQueuePipelineWatchPrefix = "/devops/pipeline/queue_manager/actions/batch-update/"
36+
etcdQueuePopOutPipelineWatchPrefix = "/devops/pipeline/queue_manager/actions/pop-out-pipeline/"
3637
)
3738

3839
var (
@@ -75,6 +76,18 @@ func (mgr *defaultManager) SendQueueToEtcd(queueID uint64) {
7576
})
7677
}
7778

79+
func (mgr *defaultManager) SendPopOutPipelineIDToEtcd(pipelineID uint64) {
80+
_ = loop.New(loop.WithDeclineRatio(2), loop.WithDeclineLimit(time.Second*60)).Do(func() (abort bool, err error) {
81+
err = mgr.js.Put(context.Background(), fmt.Sprintf("%s%d", etcdQueuePopOutPipelineWatchPrefix, pipelineID), nil)
82+
if err != nil {
83+
logrus.Errorf("%s: send pop out pipeline to etcd failed, pipelineID: %d, err: %v", defaultQueueManagerLogPrefix, pipelineID, err)
84+
return false, err
85+
}
86+
logrus.Infof("%s: send pop out pipeline to etcd successfully, pipelineID: %d", defaultQueueManagerLogPrefix, pipelineID)
87+
return true, nil
88+
})
89+
}
90+
7891
func (mgr *defaultManager) SendUpdatePriorityPipelineIDsToEtcd(queueID uint64, pipelineIDS []uint64) {
7992
_ = loop.New(loop.WithDeclineRatio(2), loop.WithDeclineLimit(time.Second*60)).Do(func() (abort bool, err error) {
8093
err = mgr.js.Put(context.Background(), fmt.Sprintf("%s%d", etcdQueuePipelineWatchPrefix, queueID), pipelineIDS)
@@ -99,7 +112,7 @@ func (mgr *defaultManager) ListenInputQueueFromEtcd(ctx context.Context) {
99112
func(key string, _ interface{}, t storetypes.ChangeType) (_ error) {
100113
go func() {
101114
logrus.Infof("%s: watched a key change: %s, changeType", key, t.String())
102-
queueID, err := parseQueueIDFromWatchedKey(key, etcdQueueWatchPrefix)
115+
queueID, err := parseIDFromWatchedKey(key, etcdQueueWatchPrefix)
103116
if err != nil {
104117
logrus.Errorf("%s: failed to parse queueID from watched key, key: %s, err: %v", defaultQueueManagerLogPrefix, key, err)
105118
return
@@ -133,7 +146,7 @@ func (mgr *defaultManager) ListenUpdatePriorityPipelineIDsFromEtcd(ctx context.C
133146
_ = mgr.js.IncludeWatch().Watch(ctx, etcdQueuePipelineWatchPrefix, true, true, false, []uint64{},
134147
func(key string, value interface{}, t storetypes.ChangeType) (_ error) {
135148
logrus.Infof("%s: watched a key change: %s, value: %v, changeType", key, value, t.String())
136-
queueID, err := parseQueueIDFromWatchedKey(key, etcdQueuePipelineWatchPrefix)
149+
queueID, err := parseIDFromWatchedKey(key, etcdQueuePipelineWatchPrefix)
137150
if err != nil {
138151
logrus.Errorf("%s: failed to parse queueID from watched key, key: %s, err: %v", defaultQueueManagerLogPrefix, key, err)
139152
return
@@ -164,7 +177,39 @@ func (mgr *defaultManager) ListenUpdatePriorityPipelineIDsFromEtcd(ctx context.C
164177
}
165178
}
166179

167-
func parseQueueIDFromWatchedKey(key string, prefixKey string) (uint64, error) {
168-
pipelineIDStr := strutil.TrimPrefixes(key, prefixKey)
169-
return strconv.ParseUint(pipelineIDStr, 10, 64)
180+
func (mgr *defaultManager) ListenPopOutPipelineIDFromEtcd(ctx context.Context) {
181+
logrus.Infof("%s: start listen pop out pipeline id", defaultQueueManagerLogPrefix)
182+
for {
183+
select {
184+
case <-ctx.Done():
185+
return
186+
default:
187+
_ = mgr.js.IncludeWatch().Watch(ctx, etcdQueuePopOutPipelineWatchPrefix, true, true, true, nil,
188+
func(key string, _ interface{}, t storetypes.ChangeType) (_ error) {
189+
go func() {
190+
defer func() {
191+
if err := mgr.js.Remove(ctx, key, nil); err != nil {
192+
logrus.Errorf("%s: failed to delete pop out pipeline key, key: %d, err: %v", defaultQueueManagerLogPrefix, key, err)
193+
}
194+
}()
195+
logrus.Infof("%s: watched a key change: %s, changeType", key, t.String())
196+
pipelineID, err := parseIDFromWatchedKey(key, etcdQueuePopOutPipelineWatchPrefix)
197+
if err != nil {
198+
logrus.Errorf("%s: failed to parse pipelineID from watched key, key: %s, err: %v", defaultQueueManagerLogPrefix, key, err)
199+
return
200+
}
201+
202+
mgr.PopOutPipelineFromQueue(pipelineID)
203+
logrus.Infof("%s: pop out pipeline from queue successfully, pipeline id: %d", defaultQueueManagerLogPrefix, pipelineID)
204+
}()
205+
206+
return nil
207+
})
208+
}
209+
}
210+
}
211+
212+
func parseIDFromWatchedKey(key string, prefixKey string) (uint64, error) {
213+
IDStr := strutil.TrimPrefixes(key, prefixKey)
214+
return strconv.ParseUint(IDStr, 10, 64)
170215
}

modules/pipeline/pipengine/reconciler/queuemanage/types/manager.go

+2
Original file line numberDiff line numberDiff line change
@@ -34,5 +34,7 @@ type QueueManager interface {
3434
ListenInputQueueFromEtcd(ctx context.Context)
3535
SendUpdatePriorityPipelineIDsToEtcd(queueID uint64, pipelineIDS []uint64)
3636
ListenUpdatePriorityPipelineIDsFromEtcd(ctx context.Context)
37+
SendPopOutPipelineIDToEtcd(pipelineID uint64)
38+
ListenPopOutPipelineIDFromEtcd(ctx context.Context)
3739
snapshot.Snapshot
3840
}

0 commit comments

Comments
 (0)