|
53 | 53 | import java.util.HashSet;
|
54 | 54 | import java.util.Iterator;
|
55 | 55 | import java.util.List;
|
| 56 | +import java.util.Locale; |
56 | 57 | import java.util.Map;
|
57 | 58 | import java.util.Objects;
|
58 | 59 | import java.util.Set;
|
@@ -82,6 +83,7 @@ public class ShardsBatchGatewayAllocator implements ExistingShardsAllocator {
|
82 | 83 |
|
83 | 84 | private TimeValue primaryShardsBatchGatewayAllocatorTimeout;
|
84 | 85 | private TimeValue replicaShardsBatchGatewayAllocatorTimeout;
|
| 86 | + private volatile Priority followUpRerouteTaskPriority; |
85 | 87 | public static final TimeValue MIN_ALLOCATOR_TIMEOUT = TimeValue.timeValueSeconds(20);
|
86 | 88 | private final ClusterManagerMetrics clusterManagerMetrics;
|
87 | 89 |
|
@@ -145,6 +147,32 @@ public void validate(TimeValue timeValue) {
|
145 | 147 | Setting.Property.Dynamic
|
146 | 148 | );
|
147 | 149 |
|
| 150 | + /** |
| 151 | + * Adjusts the priority of the followup reroute task when current round times out. NORMAL is right for reasonable clusters, |
| 152 | + * but for a cluster in a messed up state which is starving NORMAL priority tasks, it might be necessary to raise this higher |
| 153 | + * to allocate existing shards. |
| 154 | + */ |
| 155 | + public static final Setting<Priority> FOLLOW_UP_REROUTE_PRIORITY_SETTING = new Setting<>( |
| 156 | + "cluster.routing.allocation.shards_batch_gateway_allocator.schedule_reroute.priority", |
| 157 | + Priority.NORMAL.toString(), |
| 158 | + ShardsBatchGatewayAllocator::parseReroutePriority, |
| 159 | + Setting.Property.NodeScope, |
| 160 | + Setting.Property.Dynamic |
| 161 | + ); |
| 162 | + |
| 163 | + private static Priority parseReroutePriority(String priorityString) { |
| 164 | + final Priority priority = Priority.valueOf(priorityString.toUpperCase(Locale.ROOT)); |
| 165 | + switch (priority) { |
| 166 | + case NORMAL: |
| 167 | + case HIGH: |
| 168 | + case URGENT: |
| 169 | + return priority; |
| 170 | + } |
| 171 | + throw new IllegalArgumentException( |
| 172 | + "priority [" + priority + "] not supported for [" + FOLLOW_UP_REROUTE_PRIORITY_SETTING.getKey() + "]" |
| 173 | + ); |
| 174 | + } |
| 175 | + |
148 | 176 | private final RerouteService rerouteService;
|
149 | 177 | private final PrimaryShardBatchAllocator primaryShardBatchAllocator;
|
150 | 178 | private final ReplicaShardBatchAllocator replicaShardBatchAllocator;
|
@@ -179,6 +207,8 @@ public ShardsBatchGatewayAllocator(
|
179 | 207 | this.replicaShardsBatchGatewayAllocatorTimeout = REPLICA_BATCH_ALLOCATOR_TIMEOUT_SETTING.get(settings);
|
180 | 208 | clusterSettings.addSettingsUpdateConsumer(REPLICA_BATCH_ALLOCATOR_TIMEOUT_SETTING, this::setReplicaBatchAllocatorTimeout);
|
181 | 209 | this.clusterManagerMetrics = clusterManagerMetrics;
|
| 210 | + setFollowUpRerouteTaskPriority(FOLLOW_UP_REROUTE_PRIORITY_SETTING.get(settings)); |
| 211 | + clusterSettings.addSettingsUpdateConsumer(FOLLOW_UP_REROUTE_PRIORITY_SETTING, this::setFollowUpRerouteTaskPriority); |
182 | 212 | }
|
183 | 213 |
|
184 | 214 | @Override
|
@@ -309,7 +339,7 @@ public void onComplete() {
|
309 | 339 | assert rerouteService != null;
|
310 | 340 | rerouteService.reroute(
|
311 | 341 | "reroute after existing shards allocator [P] timed out",
|
312 |
| - Priority.NORMAL, |
| 342 | + followUpRerouteTaskPriority, |
313 | 343 | ActionListener.wrap(
|
314 | 344 | r -> logger.trace("reroute after existing shards allocator timed out completed"),
|
315 | 345 | e -> logger.debug("reroute after existing shards allocator timed out failed", e)
|
@@ -344,7 +374,7 @@ public void onComplete() {
|
344 | 374 | assert rerouteService != null;
|
345 | 375 | rerouteService.reroute(
|
346 | 376 | "reroute after existing shards allocator [R] timed out",
|
347 |
| - Priority.NORMAL, |
| 377 | + followUpRerouteTaskPriority, |
348 | 378 | ActionListener.wrap(
|
349 | 379 | r -> logger.trace("reroute after existing shards allocator timed out completed"),
|
350 | 380 | e -> logger.debug("reroute after existing shards allocator timed out failed", e)
|
@@ -920,4 +950,8 @@ protected void setPrimaryBatchAllocatorTimeout(TimeValue primaryShardsBatchGatew
|
920 | 950 | protected void setReplicaBatchAllocatorTimeout(TimeValue replicaShardsBatchGatewayAllocatorTimeout) {
|
921 | 951 | this.replicaShardsBatchGatewayAllocatorTimeout = replicaShardsBatchGatewayAllocatorTimeout;
|
922 | 952 | }
|
| 953 | + |
| 954 | + private void setFollowUpRerouteTaskPriority(Priority followUpRerouteTaskPriority) { |
| 955 | + this.followUpRerouteTaskPriority = followUpRerouteTaskPriority; |
| 956 | + } |
923 | 957 | }
|
0 commit comments