Skip to content

Commit ad3ac5c

Browse files
authored
BE: Exclude offline nodes from target calculation when adjusting the replication factor (#1123)
1 parent 2c2b30a commit ad3ac5c

File tree

1 file changed

+26
-6
lines changed

1 file changed

+26
-6
lines changed

api/src/main/java/io/kafbat/ui/service/TopicsService.java

Lines changed: 26 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import io.kafbat.ui.model.TopicCreationDTO;
2727
import io.kafbat.ui.model.TopicUpdateDTO;
2828
import java.time.Duration;
29+
import java.util.ArrayList;
2930
import java.util.Collection;
3031
import java.util.Collections;
3132
import java.util.Comparator;
@@ -288,6 +289,18 @@ private Map<TopicPartition, Optional<NewPartitionReassignment>> getPartitionsRea
288289
Map<Integer, Integer> brokersUsage = getBrokersMap(cluster, currentAssignment);
289290
int currentReplicationFactor = topic.getReplicationFactor();
290291

292+
// Get online nodes
293+
List<Integer> onlineNodes = statisticsCache.get(cluster).getClusterDescription().getNodes()
294+
.stream().map(Node::id).toList();
295+
296+
// keep only online nodes
297+
for (Map.Entry<Integer, List<Integer>> parition : currentAssignment.entrySet()) {
298+
parition.getValue().retainAll(onlineNodes);
299+
}
300+
301+
brokersUsage.keySet().retainAll(onlineNodes);
302+
303+
291304
// If we should to increase Replication factor
292305
if (replicationFactorChange.getTotalReplicationFactor() > currentReplicationFactor) {
293306
// For each partition
@@ -320,28 +333,35 @@ private Map<TopicPartition, Optional<NewPartitionReassignment>> getPartitionsRea
320333
var partition = assignmentEntry.getKey();
321334
var brokers = assignmentEntry.getValue();
322335

336+
// Copy from online nodes if all nodes are offline
337+
if (brokers.isEmpty()) {
338+
brokers = new ArrayList<>(onlineNodes);
339+
}
340+
323341
// Get brokers list sorted by usage in reverse order
324342
var brokersUsageList = brokersUsage.entrySet().stream()
325343
.sorted(Map.Entry.comparingByValue(Comparator.reverseOrder()))
326344
.map(Map.Entry::getKey)
327345
.toList();
328346

347+
Integer leader = topic.getPartitions().get(partition).getLeader();
348+
329349
// Iterate brokers and try to remove them from assignment
330350
// while partition replicas count != requested replication factor
331351
for (Integer broker : brokersUsageList) {
352+
if (brokers.size() == replicationFactorChange.getTotalReplicationFactor()) {
353+
break;
354+
}
332355
// Check is the broker the leader of partition
333-
if (!topic.getPartitions().get(partition).getLeader()
334-
.equals(broker)) {
356+
if (leader == null || !leader.equals(broker)) {
335357
brokers.remove(broker);
336358
brokersUsage.merge(broker, -1, Integer::sum);
337359
}
338-
if (brokers.size() == replicationFactorChange.getTotalReplicationFactor()) {
339-
break;
340-
}
341360
}
342361
if (brokers.size() != replicationFactorChange.getTotalReplicationFactor()) {
343362
throw new ValidationException("Something went wrong during removing replicas");
344363
}
364+
currentAssignment.put(partition, brokers);
345365
}
346366
} else {
347367
throw new ValidationException("Replication factor already equals requested");
@@ -374,7 +394,7 @@ private Map<Integer, Integer> getBrokersMap(KafkaCluster cluster,
374394
c -> 0
375395
));
376396
currentAssignment.values().forEach(brokers -> brokers
377-
.forEach(broker -> result.put(broker, result.get(broker) + 1)));
397+
.forEach(broker -> result.put(broker, result.getOrDefault(broker, 0) + 1)));
378398

379399
return result;
380400
}

0 commit comments

Comments
 (0)