Skip to content

Add a new goal to distribute topic leaders fairly #2267

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ public class BalancingConstraint {
private final Map<Resource, Double> _resourceBalancePercentage;
private final double _replicaBalancePercentage;
private final double _leaderReplicaBalancePercentage;
private final double _topicLeaderReplicaBalancePercentage;
private final int _topicLeaderReplicaBalanceMinGap;
private final int _topicLeaderReplicaBalanceMaxGap;
private final double _topicReplicaBalancePercentage;
private final int _topicReplicaBalanceMinGap;
private final int _topicReplicaBalanceMaxGap;
Expand Down Expand Up @@ -75,6 +78,9 @@ public BalancingConstraint(KafkaCruiseControlConfig config) {
// Set default value for the balance percentage of (1) replica, (2) leader replica and (3) topic replica distribution.
_replicaBalancePercentage = config.getDouble(AnalyzerConfig.REPLICA_COUNT_BALANCE_THRESHOLD_CONFIG);
_leaderReplicaBalancePercentage = config.getDouble(AnalyzerConfig.LEADER_REPLICA_COUNT_BALANCE_THRESHOLD_CONFIG);
_topicLeaderReplicaBalancePercentage = config.getDouble(AnalyzerConfig.TOPIC_LEADER_REPLICA_COUNT_BALANCE_THRESHOLD_CONFIG);
_topicLeaderReplicaBalanceMinGap = config.getInt(AnalyzerConfig.TOPIC_LEADER_REPLICA_COUNT_BALANCE_MIN_GAP_CONFIG);
_topicLeaderReplicaBalanceMaxGap = config.getInt(AnalyzerConfig.TOPIC_LEADER_REPLICA_COUNT_BALANCE_MAX_GAP_CONFIG);
_topicReplicaBalancePercentage = config.getDouble(AnalyzerConfig.TOPIC_REPLICA_COUNT_BALANCE_THRESHOLD_CONFIG);
_topicReplicaBalanceMinGap = config.getInt(AnalyzerConfig.TOPIC_REPLICA_COUNT_BALANCE_MIN_GAP_CONFIG);
_topicReplicaBalanceMaxGap = config.getInt(AnalyzerConfig.TOPIC_REPLICA_COUNT_BALANCE_MAX_GAP_CONFIG);
Expand Down Expand Up @@ -122,6 +128,9 @@ Properties setProps(Properties props) {
props.put(AnalyzerConfig.TOPIC_REPLICA_COUNT_BALANCE_THRESHOLD_CONFIG, Double.toString(_topicReplicaBalancePercentage));
props.put(AnalyzerConfig.TOPIC_REPLICA_COUNT_BALANCE_MIN_GAP_CONFIG, Integer.toString(_topicReplicaBalanceMinGap));
props.put(AnalyzerConfig.TOPIC_REPLICA_COUNT_BALANCE_MAX_GAP_CONFIG, Integer.toString(_topicReplicaBalanceMaxGap));
props.put(AnalyzerConfig.TOPIC_LEADER_REPLICA_COUNT_BALANCE_THRESHOLD_CONFIG, Double.toString(_topicLeaderReplicaBalancePercentage));
props.put(AnalyzerConfig.TOPIC_LEADER_REPLICA_COUNT_BALANCE_MIN_GAP_CONFIG, Integer.toString(_topicLeaderReplicaBalanceMinGap));
props.put(AnalyzerConfig.TOPIC_LEADER_REPLICA_COUNT_BALANCE_MAX_GAP_CONFIG, Integer.toString(_topicLeaderReplicaBalanceMaxGap));
props.put(AnalyzerConfig.GOAL_VIOLATION_DISTRIBUTION_THRESHOLD_MULTIPLIER_CONFIG, Double.toString(_goalViolationDistributionThresholdMultiplier));
props.put(AnalyzerConfig.TOPICS_WITH_MIN_LEADERS_PER_BROKER_CONFIG, _topicsWithMinLeadersPerBrokerPattern.pattern());
props.put(AnalyzerConfig.MIN_TOPIC_LEADERS_PER_BROKER_CONFIG, Integer.toString(_minTopicLeadersPerBroker));
Expand Down Expand Up @@ -197,6 +206,27 @@ public int topicReplicaBalanceMaxGap() {
return _topicReplicaBalanceMaxGap;
}

/**
* @return Topic replica balance percentage for {@link com.linkedin.kafka.cruisecontrol.analyzer.goals.TopicLeaderReplicaDistributionGoal}.
*/
public double topicLeaderReplicaBalancePercentage() {
return _topicLeaderReplicaBalancePercentage;
}

/**
* @return Topic replica balance minimum gap for {@link com.linkedin.kafka.cruisecontrol.analyzer.goals.TopicLeaderReplicaDistributionGoal}.
*/
public int topicLeaderReplicaBalanceMinGap() {
return _topicLeaderReplicaBalanceMinGap;
}

/**
* @return Topic replica balance maximum gap for {@link com.linkedin.kafka.cruisecontrol.analyzer.goals.TopicLeaderReplicaDistributionGoal}.
*/
public int topicLeaderReplicaBalanceMaxGap() {
return _topicLeaderReplicaBalanceMaxGap;
}

/**
* @return Goal violation distribution threshold multiplier to be used in detection and fixing goal violations.
*/
Expand Down Expand Up @@ -332,6 +362,7 @@ public String toString() {
+ "diskCapacityThreshold=%.4f,inboundNwCapacityThreshold=%.4f,outboundNwCapacityThreshold=%.4f,"
+ "maxReplicasPerBroker=%d,replicaBalancePercentage=%.4f,leaderReplicaBalancePercentage=%.4f,"
+ "topicReplicaBalancePercentage=%.4f,topicReplicaBalanceGap=[%d,%d],"
+ "topicLeaderReplicaBalancePercentage=%.4f,topicLeaderReplicaBalanceGap=[%d,%d],"
+ "goalViolationDistributionThresholdMultiplier=%.4f,"
+ "topicsWithMinLeadersPerBrokerPattern=%s,"
+ "minTopicLeadersPerBroker=%d,fastModePerBrokerMoveTimeoutMs=%d,"
Expand All @@ -343,6 +374,7 @@ public String toString() {
_capacityThreshold.get(Resource.NW_IN), _capacityThreshold.get(Resource.NW_OUT),
_maxReplicasPerBroker, _replicaBalancePercentage, _leaderReplicaBalancePercentage,
_topicReplicaBalancePercentage, _topicReplicaBalanceMinGap, _topicReplicaBalanceMaxGap,
_topicLeaderReplicaBalancePercentage, _topicLeaderReplicaBalanceMinGap, _topicLeaderReplicaBalanceMaxGap,
_goalViolationDistributionThresholdMultiplier, _topicsWithMinLeadersPerBrokerPattern.pattern(),
_minTopicLeadersPerBroker, _fastModePerBrokerMoveTimeoutMs, _brokerSetResolver.getClass().getName(),
_replicaToBrokerSetMappingPolicy.getClass().getName());
Expand Down
Loading