Skip to content

Commit 74d6094

Browse files
committed
Add Setting to adjust the primary constraint weights
Signed-off-by: Arpit Bandejiya <[email protected]>
1 parent 119abaf commit 74d6094

File tree

5 files changed

+54
-12
lines changed

5 files changed

+54
-12
lines changed

server/src/main/java/org/opensearch/cluster/routing/allocation/AllocationConstraints.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,8 @@ public void updateAllocationConstraint(String constraint, boolean enable) {
3939
this.constraints.get(constraint).setEnable(enable);
4040
}
4141

42-
public long weight(ShardsBalancer balancer, BalancedShardsAllocator.ModelNode node, String index) {
43-
Constraint.ConstraintParams params = new Constraint.ConstraintParams(balancer, node, index);
42+
public long weight(ShardsBalancer balancer, BalancedShardsAllocator.ModelNode node, String index, long primaryThresholdWeight) {
43+
Constraint.ConstraintParams params = new Constraint.ConstraintParams(balancer, node, index, primaryThresholdWeight);
4444
return params.weight(constraints);
4545
}
4646
}

server/src/main/java/org/opensearch/cluster/routing/allocation/Constraint.java

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
import java.util.Map;
1515
import java.util.function.Predicate;
1616

17-
import static org.opensearch.cluster.routing.allocation.ConstraintTypes.CONSTRAINT_WEIGHT;
17+
import static org.opensearch.cluster.routing.allocation.ConstraintTypes.predicateKeyToWeightMap;
1818

1919
/**
2020
* Defines a constraint useful to de-prioritize certain nodes as target of unassigned shards used in {@link AllocationConstraints} or
@@ -44,11 +44,13 @@ static class ConstraintParams {
4444
private ShardsBalancer balancer;
4545
private BalancedShardsAllocator.ModelNode node;
4646
private String index;
47+
private long PrimaryConstraintThreshold;
4748

48-
ConstraintParams(ShardsBalancer balancer, BalancedShardsAllocator.ModelNode node, String index) {
49+
ConstraintParams(ShardsBalancer balancer, BalancedShardsAllocator.ModelNode node, String index, long primaryConstraintThreshold) {
4950
this.balancer = balancer;
5051
this.node = node;
5152
this.index = index;
53+
this.PrimaryConstraintThreshold = primaryConstraintThreshold;
5254
}
5355

5456
public ShardsBalancer getBalancer() {
@@ -75,9 +77,12 @@ public String getIndex() {
7577
*/
7678
public long weight(Map<String, Constraint> constraints) {
7779
long totalConstraintWeight = 0;
78-
for (Constraint constraint : constraints.values()) {
80+
for (Map.Entry<String, Constraint> entry : constraints.entrySet()) {
81+
String key = entry.getKey();
82+
Constraint constraint = entry.getValue();
7983
if (constraint.test(this)) {
80-
totalConstraintWeight += CONSTRAINT_WEIGHT;
84+
double weight = predicateKeyToWeightMap(key, PrimaryConstraintThreshold);
85+
totalConstraintWeight += weight;
8186
}
8287
}
8388
return totalConstraintWeight;

server/src/main/java/org/opensearch/cluster/routing/allocation/ConstraintTypes.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,4 +86,14 @@ public static Predicate<Constraint.ConstraintParams> isPrimaryShardsPerNodeBreac
8686
return primaryShardCount >= allowedPrimaryShardCount;
8787
};
8888
}
89+
90+
public static long predicateKeyToWeightMap(String key, long primaryConstraintWeight) {
91+
switch (key) {
92+
case CLUSTER_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID:
93+
case CLUSTER_PRIMARY_SHARD_REBALANCE_CONSTRAINT_ID:
94+
return primaryConstraintWeight;
95+
default:
96+
return CONSTRAINT_WEIGHT;
97+
}
98+
}
8999
}

server/src/main/java/org/opensearch/cluster/routing/allocation/RebalanceConstraints.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,8 @@ public void updateRebalanceConstraint(String constraint, boolean enable) {
4242
this.constraints.get(constraint).setEnable(enable);
4343
}
4444

45-
public long weight(ShardsBalancer balancer, BalancedShardsAllocator.ModelNode node, String index) {
46-
Constraint.ConstraintParams params = new Constraint.ConstraintParams(balancer, node, index);
45+
public long weight(ShardsBalancer balancer, BalancedShardsAllocator.ModelNode node, String index, long primaryConstraintThreshold) {
46+
Constraint.ConstraintParams params = new Constraint.ConstraintParams(balancer, node, index, primaryConstraintThreshold);
4747
return params.weight(constraints);
4848
}
4949
}

server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java

Lines changed: 31 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,14 @@ public class BalancedShardsAllocator implements ShardsAllocator {
139139
Property.NodeScope
140140
);
141141

142+
public static final Setting<Long> PRIMARY_CONSTRAINT_THRESHOLD_SETTING = Setting.longSetting(
143+
"cluster.routing.allocation.primary_constraint.threshold",
144+
10,
145+
0,
146+
Property.Dynamic,
147+
Property.NodeScope
148+
);
149+
142150
/**
143151
* This setting governs whether primary shards balance is desired during allocation. This is used by {@link ConstraintTypes#isPerIndexPrimaryShardsPerNodeBreached()}
144152
* and {@link ConstraintTypes#isPrimaryShardsPerNodeBreached} which is used during unassigned shard allocation
@@ -201,6 +209,7 @@ public class BalancedShardsAllocator implements ShardsAllocator {
201209
private volatile float shardBalanceFactor;
202210
private volatile WeightFunction weightFunction;
203211
private volatile float threshold;
212+
private volatile long primaryConstraintThreshold;
204213

205214
private volatile boolean ignoreThrottleInRestore;
206215
private volatile TimeValue allocatorTimeout;
@@ -219,6 +228,7 @@ public BalancedShardsAllocator(Settings settings, ClusterSettings clusterSetting
219228
setIgnoreThrottleInRestore(IGNORE_THROTTLE_FOR_REMOTE_RESTORE.get(settings));
220229
updateWeightFunction();
221230
setThreshold(THRESHOLD_SETTING.get(settings));
231+
setPrimaryConstraintThresholdSetting(PRIMARY_CONSTRAINT_THRESHOLD_SETTING.get(settings));
222232
setPreferPrimaryShardBalance(PREFER_PRIMARY_SHARD_BALANCE.get(settings));
223233
setPreferPrimaryShardRebalance(PREFER_PRIMARY_SHARD_REBALANCE.get(settings));
224234
setShardMovementStrategy(SHARD_MOVEMENT_STRATEGY_SETTING.get(settings));
@@ -231,6 +241,7 @@ public BalancedShardsAllocator(Settings settings, ClusterSettings clusterSetting
231241
clusterSettings.addSettingsUpdateConsumer(PRIMARY_SHARD_REBALANCE_BUFFER, this::updatePreferPrimaryShardBalanceBuffer);
232242
clusterSettings.addSettingsUpdateConsumer(PREFER_PRIMARY_SHARD_REBALANCE, this::setPreferPrimaryShardRebalance);
233243
clusterSettings.addSettingsUpdateConsumer(THRESHOLD_SETTING, this::setThreshold);
244+
clusterSettings.addSettingsUpdateConsumer(PRIMARY_CONSTRAINT_THRESHOLD_SETTING, this::setPrimaryConstraintThresholdSetting);
234245
clusterSettings.addSettingsUpdateConsumer(IGNORE_THROTTLE_FOR_REMOTE_RESTORE, this::setIgnoreThrottleInRestore);
235246
clusterSettings.addSettingsUpdateConsumer(ALLOCATOR_TIMEOUT_SETTING, this::setAllocatorTimeout);
236247
}
@@ -294,7 +305,12 @@ private void updatePreferPrimaryShardBalanceBuffer(float preferPrimaryShardBalan
294305
}
295306

296307
private void updateWeightFunction() {
297-
weightFunction = new WeightFunction(this.indexBalanceFactor, this.shardBalanceFactor, this.preferPrimaryShardRebalanceBuffer);
308+
weightFunction = new WeightFunction(
309+
this.indexBalanceFactor,
310+
this.shardBalanceFactor,
311+
this.preferPrimaryShardRebalanceBuffer,
312+
this.primaryConstraintThreshold
313+
);
298314
}
299315

300316
/**
@@ -317,6 +333,11 @@ private void setThreshold(float threshold) {
317333
this.threshold = threshold;
318334
}
319335

336+
private void setPrimaryConstraintThresholdSetting(long threshold) {
337+
this.primaryConstraintThreshold = threshold;
338+
this.weightFunction.updatePrimaryConstraintThreshold(threshold);
339+
}
340+
320341
private void setAllocatorTimeout(TimeValue allocatorTimeout) {
321342
this.allocatorTimeout = allocatorTimeout;
322343
}
@@ -489,10 +510,11 @@ static class WeightFunction {
489510
private final float shardBalance;
490511
private final float theta0;
491512
private final float theta1;
513+
private long primaryConstraintThreshold;
492514
private AllocationConstraints constraints;
493515
private RebalanceConstraints rebalanceConstraints;
494516

495-
WeightFunction(float indexBalance, float shardBalance, float preferPrimaryBalanceBuffer) {
517+
WeightFunction(float indexBalance, float shardBalance, float preferPrimaryBalanceBuffer, long primaryConstraintThreshold) {
496518
float sum = indexBalance + shardBalance;
497519
if (sum <= 0.0f) {
498520
throw new IllegalArgumentException("Balance factors must sum to a value > 0 but was: " + sum);
@@ -501,6 +523,7 @@ static class WeightFunction {
501523
theta1 = indexBalance / sum;
502524
this.indexBalance = indexBalance;
503525
this.shardBalance = shardBalance;
526+
this.primaryConstraintThreshold = primaryConstraintThreshold;
504527
RebalanceParameter rebalanceParameter = new RebalanceParameter(preferPrimaryBalanceBuffer);
505528
this.constraints = new AllocationConstraints();
506529
this.rebalanceConstraints = new RebalanceConstraints(rebalanceParameter);
@@ -510,12 +533,12 @@ static class WeightFunction {
510533

511534
public float weightWithAllocationConstraints(ShardsBalancer balancer, ModelNode node, String index) {
512535
float balancerWeight = weight(balancer, node, index);
513-
return balancerWeight + constraints.weight(balancer, node, index);
536+
return balancerWeight + constraints.weight(balancer, node, index, primaryConstraintThreshold);
514537
}
515538

516539
public float weightWithRebalanceConstraints(ShardsBalancer balancer, ModelNode node, String index) {
517540
float balancerWeight = weight(balancer, node, index);
518-
return balancerWeight + rebalanceConstraints.weight(balancer, node, index);
541+
return balancerWeight + rebalanceConstraints.weight(balancer, node, index, primaryConstraintThreshold);
519542
}
520543

521544
float weight(ShardsBalancer balancer, ModelNode node, String index) {
@@ -531,6 +554,10 @@ void updateAllocationConstraint(String constraint, boolean enable) {
531554
void updateRebalanceConstraint(String constraint, boolean add) {
532555
this.rebalanceConstraints.updateRebalanceConstraint(constraint, add);
533556
}
557+
558+
void updatePrimaryConstraintThreshold(long primaryConstraintThreshold) {
559+
this.primaryConstraintThreshold = primaryConstraintThreshold;
560+
}
534561
}
535562

536563
/**

0 commit comments

Comments
 (0)