62
62
import java .util .HashMap ;
63
63
import java .util .HashSet ;
64
64
import java .util .Iterator ;
65
+ import java .util .Locale ;
65
66
import java .util .Map ;
66
67
import java .util .Set ;
67
68
69
+ import static org .opensearch .cluster .action .shard .ShardStateAction .FOLLOW_UP_REROUTE_PRIORITY_SETTING ;
68
70
import static org .opensearch .cluster .routing .allocation .ConstraintTypes .CLUSTER_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID ;
69
71
import static org .opensearch .cluster .routing .allocation .ConstraintTypes .CLUSTER_PRIMARY_SHARD_REBALANCE_CONSTRAINT_ID ;
70
72
import static org .opensearch .cluster .routing .allocation .ConstraintTypes .INDEX_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID ;
@@ -199,6 +201,32 @@ public class BalancedShardsAllocator implements ShardsAllocator {
199
201
Setting .Property .Dynamic
200
202
);
201
203
204
+ /**
205
+ * Adjusts the priority of the followup reroute task when current round times out. NORMAL is right for reasonable clusters,
206
+ * but for a cluster in a messed up state which is starving NORMAL priority tasks, it might be necessary to raise this higher
207
+ * to allocate shards.
208
+ */
209
+ public static final Setting <Priority > FOLLOW_UP_REROUTE_PRIORITY_SETTING = new Setting <>(
210
+ "cluster.routing.allocation.balanced_shards_allocator.schedule_reroute.priority" ,
211
+ Priority .NORMAL .toString (),
212
+ BalancedShardsAllocator ::parseReroutePriority ,
213
+ Setting .Property .NodeScope ,
214
+ Setting .Property .Dynamic
215
+ );
216
+
217
+ private static Priority parseReroutePriority (String priorityString ) {
218
+ final Priority priority = Priority .valueOf (priorityString .toUpperCase (Locale .ROOT ));
219
+ switch (priority ) {
220
+ case NORMAL :
221
+ case HIGH :
222
+ case URGENT :
223
+ return priority ;
224
+ }
225
+ throw new IllegalArgumentException (
226
+ "priority [" + priority + "] not supported for [" + FOLLOW_UP_REROUTE_PRIORITY_SETTING .getKey () + "]"
227
+ );
228
+ }
229
+
202
230
private volatile boolean movePrimaryFirst ;
203
231
private volatile ShardMovementStrategy shardMovementStrategy ;
204
232
@@ -213,6 +241,7 @@ public class BalancedShardsAllocator implements ShardsAllocator {
213
241
214
242
private volatile boolean ignoreThrottleInRestore ;
215
243
private volatile TimeValue allocatorTimeout ;
244
+ private volatile Priority followUpRerouteTaskPriority ;
216
245
private long startTime ;
217
246
private RerouteService rerouteService ;
218
247
@@ -233,6 +262,7 @@ public BalancedShardsAllocator(Settings settings, ClusterSettings clusterSetting
233
262
setPreferPrimaryShardRebalance (PREFER_PRIMARY_SHARD_REBALANCE .get (settings ));
234
263
setShardMovementStrategy (SHARD_MOVEMENT_STRATEGY_SETTING .get (settings ));
235
264
setAllocatorTimeout (ALLOCATOR_TIMEOUT_SETTING .get (settings ));
265
+ setFollowUpRerouteTaskPriority (FOLLOW_UP_REROUTE_PRIORITY_SETTING .get (settings ));
236
266
clusterSettings .addSettingsUpdateConsumer (PREFER_PRIMARY_SHARD_BALANCE , this ::setPreferPrimaryShardBalance );
237
267
clusterSettings .addSettingsUpdateConsumer (SHARD_MOVE_PRIMARY_FIRST_SETTING , this ::setMovePrimaryFirst );
238
268
clusterSettings .addSettingsUpdateConsumer (SHARD_MOVEMENT_STRATEGY_SETTING , this ::setShardMovementStrategy );
@@ -244,6 +274,7 @@ public BalancedShardsAllocator(Settings settings, ClusterSettings clusterSetting
244
274
clusterSettings .addSettingsUpdateConsumer (PRIMARY_CONSTRAINT_THRESHOLD_SETTING , this ::setPrimaryConstraintThresholdSetting );
245
275
clusterSettings .addSettingsUpdateConsumer (IGNORE_THROTTLE_FOR_REMOTE_RESTORE , this ::setIgnoreThrottleInRestore );
246
276
clusterSettings .addSettingsUpdateConsumer (ALLOCATOR_TIMEOUT_SETTING , this ::setAllocatorTimeout );
277
+ clusterSettings .addSettingsUpdateConsumer (FOLLOW_UP_REROUTE_PRIORITY_SETTING , this ::setFollowUpRerouteTaskPriority );
247
278
}
248
279
249
280
@ Override
@@ -342,6 +373,10 @@ private void setAllocatorTimeout(TimeValue allocatorTimeout) {
342
373
this .allocatorTimeout = allocatorTimeout ;
343
374
}
344
375
376
+ private void setFollowUpRerouteTaskPriority (Priority followUpRerouteTaskPriority ) {
377
+ this .followUpRerouteTaskPriority = followUpRerouteTaskPriority ;
378
+ }
379
+
345
380
protected boolean allocatorTimedOut () {
346
381
if (allocatorTimeout .equals (TimeValue .MINUS_ONE )) {
347
382
if (logger .isTraceEnabled ()) {
@@ -438,10 +473,13 @@ private void failAllocationOfNewPrimaries(RoutingAllocation allocation) {
438
473
439
474
private void scheduleRerouteIfAllocatorTimedOut () {
440
475
if (allocatorTimedOut ()) {
441
- assert rerouteService != null : "RerouteService not set to schedule reroute after allocator time out" ;
476
+ if (rerouteService == null ) {
477
+ logger .info ("RerouteService not set to schedule reroute after allocator time out" );
478
+ return ;
479
+ }
442
480
rerouteService .reroute (
443
481
"reroute after balanced shards allocator timed out" ,
444
- Priority . HIGH ,
482
+ followUpRerouteTaskPriority ,
445
483
ActionListener .wrap (
446
484
r -> logger .trace ("reroute after balanced shards allocator timed out completed" ),
447
485
e -> logger .debug ("reroute after balanced shards allocator timed out failed" , e )
0 commit comments