Skip to content

Commit fda5597

Browse files
Offline calculation of total shard per node and caching it for weight calculation inside LocalShardBalancer (#14675) (#14689)
(cherry picked from commit 6d0484a) Signed-off-by: RS146BIJAY <[email protected]> Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com> Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
1 parent 8b7e8fe commit fda5597

File tree

1 file changed

+10
-2
lines changed

1 file changed

+10
-2
lines changed

server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/LocalShardsBalancer.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ public class LocalShardsBalancer extends ShardsBalancer {
6868
private final float avgPrimaryShardsPerNode;
6969
private final BalancedShardsAllocator.NodeSorter sorter;
7070
private final Set<RoutingNode> inEligibleTargetNode;
71+
private int totalShardCount = 0;
7172

7273
public LocalShardsBalancer(
7374
Logger logger,
@@ -125,8 +126,7 @@ public float avgPrimaryShardsPerNode() {
125126
*/
126127
@Override
127128
public float avgShardsPerNode() {
128-
float totalShards = nodes.values().stream().map(BalancedShardsAllocator.ModelNode::numShards).reduce(0, Integer::sum);
129-
return totalShards / nodes.size();
129+
return totalShardCount / nodes.size();
130130
}
131131

132132
/**
@@ -598,13 +598,15 @@ void moveShards() {
598598
final BalancedShardsAllocator.ModelNode sourceNode = nodes.get(shardRouting.currentNodeId());
599599
final BalancedShardsAllocator.ModelNode targetNode = nodes.get(moveDecision.getTargetNode().getId());
600600
sourceNode.removeShard(shardRouting);
601+
--totalShardCount;
601602
Tuple<ShardRouting, ShardRouting> relocatingShards = routingNodes.relocateShard(
602603
shardRouting,
603604
targetNode.getNodeId(),
604605
allocation.clusterInfo().getShardSize(shardRouting, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE),
605606
allocation.changes()
606607
);
607608
targetNode.addShard(relocatingShards.v2());
609+
++totalShardCount;
608610
if (logger.isTraceEnabled()) {
609611
logger.trace("Moved shard [{}] to node [{}]", shardRouting, targetNode.getRoutingNode());
610612
}
@@ -724,6 +726,7 @@ private Map<String, BalancedShardsAllocator.ModelNode> buildModelFromAssigned()
724726
/* we skip relocating shards here since we expect an initializing shard with the same id coming in */
725727
if (RoutingPool.LOCAL_ONLY.equals(RoutingPool.getShardPool(shard, allocation)) && shard.state() != RELOCATING) {
726728
node.addShard(shard);
729+
++totalShardCount;
727730
if (logger.isTraceEnabled()) {
728731
logger.trace("Assigned shard [{}] to node [{}]", shard, node.getNodeId());
729732
}
@@ -815,6 +818,7 @@ void allocateUnassigned() {
815818
);
816819
shard = routingNodes.initializeShard(shard, minNode.getNodeId(), null, shardSize, allocation.changes());
817820
minNode.addShard(shard);
821+
++totalShardCount;
818822
if (!shard.primary()) {
819823
// copy over the same replica shards to the secondary array so they will get allocated
820824
// in a subsequent iteration, allowing replicas of other shards to be allocated first
@@ -844,6 +848,7 @@ void allocateUnassigned() {
844848
allocation.routingTable()
845849
);
846850
minNode.addShard(shard.initialize(minNode.getNodeId(), null, shardSize));
851+
++totalShardCount;
847852
} else {
848853
if (logger.isTraceEnabled()) {
849854
logger.trace("No Node found to assign shard [{}]", shard);
@@ -1011,18 +1016,21 @@ private boolean tryRelocateShard(BalancedShardsAllocator.ModelNode minNode, Bala
10111016
}
10121017
final Decision decision = new Decision.Multi().add(allocationDecision).add(rebalanceDecision);
10131018
maxNode.removeShard(shard);
1019+
--totalShardCount;
10141020
long shardSize = allocation.clusterInfo().getShardSize(shard, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE);
10151021

10161022
if (decision.type() == Decision.Type.YES) {
10171023
/* only allocate on the cluster if we are not throttled */
10181024
logger.debug("Relocate [{}] from [{}] to [{}]", shard, maxNode.getNodeId(), minNode.getNodeId());
10191025
minNode.addShard(routingNodes.relocateShard(shard, minNode.getNodeId(), shardSize, allocation.changes()).v1());
1026+
++totalShardCount;
10201027
return true;
10211028
} else {
10221029
/* allocate on the model even if throttled */
10231030
logger.debug("Simulate relocation of [{}] from [{}] to [{}]", shard, maxNode.getNodeId(), minNode.getNodeId());
10241031
assert decision.type() == Decision.Type.THROTTLE;
10251032
minNode.addShard(shard.relocate(minNode.getNodeId(), shardSize));
1033+
++totalShardCount;
10261034
return false;
10271035
}
10281036
}

0 commit comments

Comments
 (0)