@@ -31,8 +31,9 @@ import (
31
31
)
32
32
33
33
const (
34
- etcdQueueWatchPrefix = "/devops/pipeline/queue_manager/actions/update/"
35
- etcdQueuePipelineWatchPrefix = "/devops/pipeline/queue_manager/actions/batch-update/"
34
+ etcdQueueWatchPrefix = "/devops/pipeline/queue_manager/actions/update/"
35
+ etcdQueuePipelineWatchPrefix = "/devops/pipeline/queue_manager/actions/batch-update/"
36
+ etcdQueuePopOutPipelineWatchPrefix = "/devops/pipeline/queue_manager/actions/pop-out-pipeline/"
36
37
)
37
38
38
39
var (
@@ -75,6 +76,18 @@ func (mgr *defaultManager) SendQueueToEtcd(queueID uint64) {
75
76
})
76
77
}
77
78
79
+ func (mgr * defaultManager ) SendPopOutPipelineIDToEtcd (pipelineID uint64 ) {
80
+ _ = loop .New (loop .WithDeclineRatio (2 ), loop .WithDeclineLimit (time .Second * 60 )).Do (func () (abort bool , err error ) {
81
+ err = mgr .js .Put (context .Background (), fmt .Sprintf ("%s%d" , etcdQueuePopOutPipelineWatchPrefix , pipelineID ), nil )
82
+ if err != nil {
83
+ logrus .Errorf ("%s: send pop out pipeline to etcd failed, pipelineID: %d, err: %v" , defaultQueueManagerLogPrefix , pipelineID , err )
84
+ return false , err
85
+ }
86
+ logrus .Infof ("%s: send pop out pipeline to etcd successfully, pipelineID: %d" , defaultQueueManagerLogPrefix , pipelineID )
87
+ return true , nil
88
+ })
89
+ }
90
+
78
91
func (mgr * defaultManager ) SendUpdatePriorityPipelineIDsToEtcd (queueID uint64 , pipelineIDS []uint64 ) {
79
92
_ = loop .New (loop .WithDeclineRatio (2 ), loop .WithDeclineLimit (time .Second * 60 )).Do (func () (abort bool , err error ) {
80
93
err = mgr .js .Put (context .Background (), fmt .Sprintf ("%s%d" , etcdQueuePipelineWatchPrefix , queueID ), pipelineIDS )
@@ -99,7 +112,7 @@ func (mgr *defaultManager) ListenInputQueueFromEtcd(ctx context.Context) {
99
112
func (key string , _ interface {}, t storetypes.ChangeType ) (_ error ) {
100
113
go func () {
101
114
logrus .Infof ("%s: watched a key change: %s, changeType" , key , t .String ())
102
- queueID , err := parseQueueIDFromWatchedKey (key , etcdQueueWatchPrefix )
115
+ queueID , err := parseIDFromWatchedKey (key , etcdQueueWatchPrefix )
103
116
if err != nil {
104
117
logrus .Errorf ("%s: failed to parse queueID from watched key, key: %s, err: %v" , defaultQueueManagerLogPrefix , key , err )
105
118
return
@@ -133,7 +146,7 @@ func (mgr *defaultManager) ListenUpdatePriorityPipelineIDsFromEtcd(ctx context.C
133
146
_ = mgr .js .IncludeWatch ().Watch (ctx , etcdQueuePipelineWatchPrefix , true , true , false , []uint64 {},
134
147
func (key string , value interface {}, t storetypes.ChangeType ) (_ error ) {
135
148
logrus .Infof ("%s: watched a key change: %s, value: %v, changeType" , key , value , t .String ())
136
- queueID , err := parseQueueIDFromWatchedKey (key , etcdQueuePipelineWatchPrefix )
149
+ queueID , err := parseIDFromWatchedKey (key , etcdQueuePipelineWatchPrefix )
137
150
if err != nil {
138
151
logrus .Errorf ("%s: failed to parse queueID from watched key, key: %s, err: %v" , defaultQueueManagerLogPrefix , key , err )
139
152
return
@@ -164,7 +177,39 @@ func (mgr *defaultManager) ListenUpdatePriorityPipelineIDsFromEtcd(ctx context.C
164
177
}
165
178
}
166
179
167
- func parseQueueIDFromWatchedKey (key string , prefixKey string ) (uint64 , error ) {
168
- pipelineIDStr := strutil .TrimPrefixes (key , prefixKey )
169
- return strconv .ParseUint (pipelineIDStr , 10 , 64 )
180
+ func (mgr * defaultManager ) ListenPopOutPipelineIDFromEtcd (ctx context.Context ) {
181
+ logrus .Infof ("%s: start listen pop out pipeline id" , defaultQueueManagerLogPrefix )
182
+ for {
183
+ select {
184
+ case <- ctx .Done ():
185
+ return
186
+ default :
187
+ _ = mgr .js .IncludeWatch ().Watch (ctx , etcdQueuePopOutPipelineWatchPrefix , true , true , true , nil ,
188
+ func (key string , _ interface {}, t storetypes.ChangeType ) (_ error ) {
189
+ go func () {
190
+ defer func () {
191
+ if err := mgr .js .Remove (ctx , key , nil ); err != nil {
192
+ logrus .Errorf ("%s: failed to delete pop out pipeline key, key: %s, err: %v" , defaultQueueManagerLogPrefix , key , err )
193
+ }
194
+ }()
195
+ logrus .Infof ("%s: watched a key change: %s, changeType" , key , t .String ())
196
+ pipelineID , err := parseIDFromWatchedKey (key , etcdQueuePopOutPipelineWatchPrefix )
197
+ if err != nil {
198
+ logrus .Errorf ("%s: failed to parse pipelineID from watched key, key: %s, err: %v" , defaultQueueManagerLogPrefix , key , err )
199
+ return
200
+ }
201
+
202
+ mgr .PopOutPipelineFromQueue (pipelineID )
203
+ logrus .Infof ("%s: pop out pipeline from queue successfully, pipeline id: %d" , defaultQueueManagerLogPrefix , pipelineID )
204
+ }()
205
+
206
+ return nil
207
+ })
208
+ }
209
+ }
210
+ }
211
+
212
+ func parseIDFromWatchedKey (key string , prefixKey string ) (uint64 , error ) {
213
+ IDStr := strutil .TrimPrefixes (key , prefixKey )
214
+ return strconv .ParseUint (IDStr , 10 , 64 )
170
215
}
0 commit comments