@@ -25,9 +25,12 @@ import (
25
25
"fmt"
26
26
"io"
27
27
"os"
28
+ "slices"
28
29
"strings"
29
30
30
31
"github.com/urfave/cli/v2"
32
+ "go.uber.org/multierr"
33
+ "golang.org/x/exp/maps"
31
34
32
35
"github.com/uber/cadence/client/frontend"
33
36
"github.com/uber/cadence/common"
@@ -37,19 +40,22 @@ import (
37
40
38
41
type (
39
42
TaskListRow struct {
40
- Name string `header:"Task List Name"`
41
- Type string `header:"Type "`
42
- PollerCount int `header:"Poller Count"`
43
+ Name string `header:"Task List Name"`
44
+ ActivityPollerCount int `header:"Activity Poller Count "`
45
+ DecisionPollerCount int `header:"Decision Poller Count"`
43
46
}
44
47
TaskListStatusRow struct {
45
- ReadLevel int64 `header:"Read Level"`
46
- AckLevel int64 `header:"Ack Level"`
47
- Backlog int64 `header:"Backlog"`
48
- RPS float64 `header:"RPS"`
49
- StartID int64 `header:"Lease Start TaskID"`
50
- EndID int64 `header:"Lease End TaskID"`
48
+ Type string `header:"Type"`
49
+ PollerCount int `header:"PollerCount"`
50
+ ReadLevel int64 `header:"Read Level"`
51
+ AckLevel int64 `header:"Ack Level"`
52
+ Backlog int64 `header:"Backlog"`
53
+ RPS float64 `header:"RPS"`
54
+ StartID int64 `header:"Lease Start TaskID"`
55
+ EndID int64 `header:"Lease End TaskID"`
51
56
}
52
57
TaskListPartitionConfigRow struct {
58
+ Type string `header:"Type"`
53
59
Version int64 `header:"Version"`
54
60
ReadPartitions map [int ]* types.TaskListPartition `header:"Read Partitions"`
55
61
WritePartitions map [int ]* types.TaskListPartition `header:"Write Partitions"`
@@ -70,47 +76,46 @@ func AdminDescribeTaskList(c *cli.Context) error {
70
76
if err != nil {
71
77
return commoncli .Problem ("Required flag not found: " , err )
72
78
}
73
- taskListType := types . TaskListTypeDecision
74
- if strings . ToLower ( c . String ( FlagTaskListType )) == "activity" {
75
- taskListType = types . TaskListTypeActivity
79
+ taskListTypes , err := getTaskListTypes ( c )
80
+ if err != nil {
81
+ return err
76
82
}
77
83
78
84
ctx , cancel , err := newContext (c )
79
85
defer cancel ()
80
86
if err != nil {
81
87
return commoncli .Problem ("Error in creating context:" , err )
82
88
}
83
- request := & types.DescribeTaskListRequest {
84
- Domain : domain ,
85
- TaskList : & types.TaskList {Name : taskList },
86
- TaskListType : & taskListType ,
87
- IncludeTaskListStatus : true ,
88
- }
89
+ responses := make (map [types.TaskListType ]* types.DescribeTaskListResponse )
90
+ for _ , tlType := range taskListTypes {
91
+ request := & types.DescribeTaskListRequest {
92
+ Domain : domain ,
93
+ TaskList : & types.TaskList {Name : taskList },
94
+ TaskListType : tlType .Ptr (),
95
+ IncludeTaskListStatus : true ,
96
+ }
89
97
90
- response , err := frontendClient .DescribeTaskList (ctx , request )
91
- if err != nil {
92
- return commoncli .Problem ("Operation DescribeTaskList failed." , err )
98
+ response , err := frontendClient .DescribeTaskList (ctx , request )
99
+ if err != nil {
100
+ return commoncli .Problem ("Operation DescribeTaskList failed for type: " + tlType .String (), err )
101
+ }
102
+ responses [tlType ] = response
93
103
}
94
-
95
- taskListStatus := response .GetTaskListStatus ()
96
- if taskListStatus == nil {
97
- return commoncli .Problem (colorMagenta ("No tasklist status information." ), nil )
104
+ if c .String (FlagFormat ) == formatJSON {
105
+ prettyPrintJSONObject (getDeps (c ).Output (), responses )
106
+ return nil
98
107
}
99
- if err := printTaskListStatus (getDeps (c ).Output (), taskListStatus ); err != nil {
108
+
109
+ if err := printTaskListStatus (getDeps (c ).Output (), responses ); err != nil {
100
110
return fmt .Errorf ("failed to print task list status: %w" , err )
101
111
}
102
112
getDeps (c ).Output ().Write ([]byte ("\n " ))
103
- if response .PartitionConfig != nil {
104
- if err := printTaskListPartitionConfig (getDeps (c ).Output (), response .PartitionConfig ); err != nil {
105
- return fmt .Errorf ("failed to print task list partition config: %w" , err )
106
- }
107
- getDeps (c ).Output ().Write ([]byte ("\n " ))
108
- }
109
- pollers := response .Pollers
110
- if len (pollers ) == 0 {
111
- return commoncli .Problem (colorMagenta ("No poller for tasklist: " + taskList ), nil )
113
+ if err := printTaskListPartitionConfig (getDeps (c ).Output (), responses ); err != nil {
114
+ return fmt .Errorf ("failed to print task list partition config: %w" , err )
112
115
}
113
- return printTaskListPollers (getDeps (c ).Output (), pollers , taskListType )
116
+ getDeps (c ).Output ().Write ([]byte ("\n " ))
117
+
118
+ return nil
114
119
}
115
120
116
121
// AdminListTaskList displays all task lists under a domain.
@@ -137,35 +142,74 @@ func AdminListTaskList(c *cli.Context) error {
137
142
return commoncli .Problem ("Operation GetTaskListByDomain failed." , err )
138
143
}
139
144
145
+ if c .String (FlagFormat ) == formatJSON {
146
+ prettyPrintJSONObject (getDeps (c ).Output (), response )
147
+ return nil
148
+ }
149
+
140
150
fmt .Println ("Task Lists for domain " + domain + ":" )
141
- table := [ ]TaskListRow {}
151
+ tlByName := make ( map [ string ]TaskListRow )
142
152
for name , taskList := range response .GetDecisionTaskListMap () {
143
- table = append (table , TaskListRow {name , "Decision" , len (taskList .GetPollers ())})
153
+ row := tlByName [name ]
154
+ row .Name = name
155
+ row .DecisionPollerCount = len (taskList .GetPollers ())
156
+ tlByName [name ] = row
144
157
}
145
158
for name , taskList := range response .GetActivityTaskListMap () {
146
- table = append (table , TaskListRow {name , "Activity" , len (taskList .GetPollers ())})
147
- }
159
+ row := tlByName [name ]
160
+ row .Name = name
161
+ row .ActivityPollerCount = len (taskList .GetPollers ())
162
+ tlByName [name ] = row
163
+ }
164
+ table := maps .Values (tlByName )
165
+ slices .SortFunc (table , func (a , b TaskListRow ) int {
166
+ return strings .Compare (a .Name , b .Name )
167
+ })
148
168
return RenderTable (os .Stdout , table , RenderOptions {Color : true , Border : true })
149
169
}
150
170
151
- func printTaskListStatus (w io.Writer , taskListStatus * types.TaskListStatus ) error {
152
- table := []TaskListStatusRow {{
153
- ReadLevel : taskListStatus .GetReadLevel (),
154
- AckLevel : taskListStatus .GetAckLevel (),
155
- Backlog : taskListStatus .GetBacklogCountHint (),
156
- RPS : taskListStatus .GetRatePerSecond (),
157
- StartID : taskListStatus .GetTaskIDBlock ().GetStartID (),
158
- EndID : taskListStatus .GetTaskIDBlock ().GetEndID (),
159
- }}
171
+ func printTaskListStatus (w io.Writer , responses map [types.TaskListType ]* types.DescribeTaskListResponse ) error {
172
+ var table []TaskListStatusRow
173
+ for tlType , response := range responses {
174
+ taskListStatus := response .TaskListStatus
175
+ table = append (table , TaskListStatusRow {
176
+ Type : tlType .String (),
177
+ PollerCount : len (response .Pollers ),
178
+ ReadLevel : taskListStatus .GetReadLevel (),
179
+ AckLevel : taskListStatus .GetAckLevel (),
180
+ Backlog : taskListStatus .GetBacklogCountHint (),
181
+ RPS : taskListStatus .GetRatePerSecond (),
182
+ StartID : taskListStatus .GetTaskIDBlock ().GetStartID (),
183
+ EndID : taskListStatus .GetTaskIDBlock ().GetEndID (),
184
+ })
185
+ }
186
+ slices .SortFunc (table , func (a , b TaskListStatusRow ) int {
187
+ return strings .Compare (a .Type , b .Type )
188
+ })
160
189
return RenderTable (w , table , RenderOptions {Color : true })
161
190
}
162
191
163
- func printTaskListPartitionConfig (w io.Writer , config * types.TaskListPartitionConfig ) error {
164
- table := TaskListPartitionConfigRow {
165
- Version : config .Version ,
166
- ReadPartitions : config .ReadPartitions ,
167
- WritePartitions : config .WritePartitions ,
168
- }
192
+ func printTaskListPartitionConfig (w io.Writer , responses map [types.TaskListType ]* types.DescribeTaskListResponse ) error {
193
+ var table []TaskListPartitionConfigRow
194
+ for tlType , response := range responses {
195
+ config := response .PartitionConfig
196
+ if config == nil {
197
+ config = & types.TaskListPartitionConfig {
198
+ Version : 0 ,
199
+ ReadPartitions : createPartitions (1 ),
200
+ WritePartitions : createPartitions (1 ),
201
+ }
202
+ }
203
+ table = append (table , TaskListPartitionConfigRow {
204
+ Type : tlType .String (),
205
+ Version : config .Version ,
206
+ ReadPartitions : config .ReadPartitions ,
207
+ WritePartitions : config .WritePartitions ,
208
+ })
209
+ }
210
+ slices .SortFunc (table , func (a , b TaskListPartitionConfigRow ) int {
211
+ return strings .Compare (a .Type , b .Type )
212
+ })
169
213
return RenderTable (w , table , RenderOptions {Color : true })
170
214
}
171
215
@@ -187,13 +231,9 @@ func AdminUpdateTaskListPartitionConfig(c *cli.Context) error {
187
231
return commoncli .Problem ("Required flag not found: " , err )
188
232
}
189
233
force := c .Bool (FlagForce )
190
- var taskListType * types.TaskListType
191
- if strings .ToLower (c .String (FlagTaskListType )) == "activity" {
192
- taskListType = types .TaskListTypeActivity .Ptr ()
193
- } else if strings .ToLower (c .String (FlagTaskListType )) == "decision" {
194
- taskListType = types .TaskListTypeDecision .Ptr ()
195
- } else {
196
- return commoncli .Problem ("Invalid task list type: valid types are [activity, decision]" , nil )
234
+ taskListTypes , err := getTaskListTypes (c )
235
+ if err != nil {
236
+ return err
197
237
}
198
238
numReadPartitions , err := getRequiredIntOption (c , FlagNumReadPartitions )
199
239
if err != nil {
@@ -214,38 +254,56 @@ func AdminUpdateTaskListPartitionConfig(c *cli.Context) error {
214
254
}
215
255
tl := & types.TaskList {Name : taskList , Kind : types .TaskListKindNormal .Ptr ()}
216
256
if ! force {
217
- err = validateChange (ctx , frontendClient , domain , tl , taskListType , cfg )
218
- if err != nil {
219
- return commoncli .Problem ("Potentially unsafe operation. Specify '--force' to proceed anyway: " , err )
257
+ hasPollers := false
258
+ var errors []error
259
+ for _ , tlType := range taskListTypes {
260
+ typeHasPollers , typeErr := validateChange (ctx , frontendClient , domain , tl , tlType .Ptr (), cfg )
261
+ if typeErr != nil {
262
+ errors = append (errors , fmt .Errorf ("%s:%s failed validation: %w" , tl .Name , tlType , typeErr ))
263
+ }
264
+ hasPollers = typeHasPollers || hasPollers
265
+ }
266
+ if len (errors ) > 0 {
267
+ return commoncli .Problem ("Potentially unsafe operation. Specify '--force' to proceed anyway" , multierr .Combine (errors ... ))
268
+ }
269
+ if ! hasPollers {
270
+ return commoncli .Problem (fmt .Sprintf ("Operation is safe but %s has no pollers of the specified types. Is the name correct? Specify '--force' to proceed anyway" , tl .Name ), nil )
220
271
}
221
272
}
222
- _ , err = adminClient .UpdateTaskListPartitionConfig (ctx , & types.UpdateTaskListPartitionConfigRequest {
223
- Domain : domain ,
224
- TaskList : tl ,
225
- TaskListType : taskListType ,
226
- PartitionConfig : cfg ,
227
- })
228
- if err != nil {
229
- return commoncli .Problem ("Operation UpdateTaskListPartitionConfig failed." , err )
273
+ for _ , tlType := range taskListTypes {
274
+ _ , err = adminClient .UpdateTaskListPartitionConfig (ctx , & types.UpdateTaskListPartitionConfigRequest {
275
+ Domain : domain ,
276
+ TaskList : tl ,
277
+ TaskListType : tlType .Ptr (),
278
+ PartitionConfig : cfg ,
279
+ })
280
+ if err != nil {
281
+ return commoncli .Problem ("Operation UpdateTaskListPartitionConfig failed for type: " + tlType .String (), err )
282
+ }
283
+ msg := fmt .Sprintf ("Successfully updated %s:%s" , tl .Name , tlType )
284
+ _ , err = getDeps (c ).Output ().Write ([]byte (msg ))
285
+ if err != nil {
286
+ return err
287
+ }
230
288
}
231
289
return nil
232
290
}
233
291
234
- func validateChange (ctx context.Context , client frontend.Client , domain string , tl * types.TaskList , tlt * types.TaskListType , newCfg * types.TaskListPartitionConfig ) error {
292
+ func validateChange (ctx context.Context , client frontend.Client , domain string , tl * types.TaskList , tlt * types.TaskListType , newCfg * types.TaskListPartitionConfig ) ( bool , error ) {
235
293
description , err := client .DescribeTaskList (ctx , & types.DescribeTaskListRequest {
236
294
Domain : domain ,
237
295
TaskList : tl ,
238
296
TaskListType : tlt ,
239
297
})
240
298
if err != nil {
241
- return fmt .Errorf ("DescribeTaskList failed: %w" , err )
299
+ return false , fmt .Errorf ("DescribeTaskList failed: %w" , err )
242
300
}
243
301
// Illegal operations are rejected by the server (read < write), but unsafe ones are still allowed
244
302
if description .PartitionConfig != nil {
245
303
oldCfg := description .PartitionConfig
246
304
// Ensure they're not removing active write partitions
247
305
if len (newCfg .ReadPartitions ) < len (oldCfg .WritePartitions ) {
248
- return fmt .Errorf ("remove write partitions, then read partitions. Removing an active write partition risks losing tasks. Proposed read count is less than current write count (%d < %d)" , len (newCfg .ReadPartitions ), len (oldCfg .WritePartitions ))
306
+ return false , fmt .Errorf ("remove write partitions, then read partitions. Removing an active write partition risks losing tasks. Proposed read count is less than current write count (%d < %d)" , len (newCfg .ReadPartitions ), len (oldCfg .WritePartitions ))
249
307
}
250
308
// Ensure removed read partitions are drained
251
309
for i := len (newCfg .ReadPartitions ); i < len (oldCfg .ReadPartitions ); i ++ {
@@ -256,18 +314,15 @@ func validateChange(ctx context.Context, client frontend.Client, domain string,
256
314
IncludeTaskListStatus : true ,
257
315
})
258
316
if err != nil {
259
- return fmt .Errorf ("DescribeTaskList failed for partition %d: %w" , i , err )
317
+ return false , fmt .Errorf ("DescribeTaskList failed for partition %d: %w" , i , err )
260
318
}
261
319
if partition .TaskListStatus .BacklogCountHint != 0 {
262
- return fmt .Errorf ("partition %d still has %d tasks remaining" , i , partition .TaskListStatus .BacklogCountHint )
320
+ return false , fmt .Errorf ("partition %d still has %d tasks remaining" , i , partition .TaskListStatus .BacklogCountHint )
263
321
}
264
322
}
265
323
}
266
324
// If it's otherwise valid but there are no pollers, they might have mistyped the name
267
- if len (description .Pollers ) == 0 {
268
- return fmt .Errorf ("'%s' has no pollers of type '%s'" , tl .Name , tlt .String ())
269
- }
270
- return nil
325
+ return len (description .Pollers ) > 0 , nil
271
326
}
272
327
273
328
func createPartitions (num int ) map [int ]* types.TaskListPartition {
@@ -284,3 +339,17 @@ func getPartitionTaskListName(root string, partition int) string {
284
339
}
285
340
return fmt .Sprintf ("%v%v/%v" , common .ReservedTaskListPrefix , root , partition )
286
341
}
342
+
343
+ func getTaskListTypes (c * cli.Context ) ([]types.TaskListType , error ) {
344
+ var taskListTypes []types.TaskListType
345
+ if strings .ToLower (c .String (FlagTaskListType )) == "activity" {
346
+ taskListTypes = []types.TaskListType {types .TaskListTypeActivity }
347
+ } else if strings .ToLower (c .String (FlagTaskListType )) == "decision" {
348
+ taskListTypes = []types.TaskListType {types .TaskListTypeDecision }
349
+ } else if c .String (FlagTaskListType ) == "" {
350
+ taskListTypes = []types.TaskListType {types .TaskListTypeActivity , types .TaskListTypeDecision }
351
+ } else {
352
+ return nil , commoncli .Problem ("Invalid task list type: valid types are [activity, decision]" , nil )
353
+ }
354
+ return taskListTypes , nil
355
+ }
0 commit comments