Skip to content

KAFKA-18893: Add KIP-877 support to ReplicaSelector #19064

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Mar 21, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@
/**
* Plug-able interface for selecting a preferred read replica given the current set of replicas for a partition
* and metadata from the client.
* Implement {@link org.apache.kafka.common.metrics.Monitorable} to enable the selector to register metrics.
* The following tags are automatically added to all metrics registered: <code>config</code> set to
* <code>replica.selector.class</code>, and <code>class</code> set to the ReplicaSelector class name.
*/
public interface ReplicaSelector extends Configurable, Closeable {

Expand Down
18 changes: 9 additions & 9 deletions core/src/main/scala/kafka/server/ReplicaManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import kafka.server.ReplicaManager.{AtMinIsrPartitionCountMetricName, FailedIsrU
import kafka.server.share.DelayedShareFetch
import kafka.utils._
import org.apache.kafka.common.errors._
import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.internals.{Plugin, Topic}
import org.apache.kafka.common.message.DeleteRecordsResponseData.DeleteRecordsPartitionResult
import org.apache.kafka.common.message.DescribeLogDirsResponseData.DescribeLogDirsTopic
import org.apache.kafka.common.message.ListOffsetsRequestData.{ListOffsetsPartition, ListOffsetsTopic}
Expand Down Expand Up @@ -338,7 +338,7 @@ class ReplicaManager(val config: KafkaConfig,
}

// Visible for testing
private[server] val replicaSelectorOpt: Option[ReplicaSelector] = createReplicaSelector()
private[server] val replicaSelectorPlugin: Option[Plugin[ReplicaSelector]] = createReplicaSelector(metrics)

metricsGroup.newGauge(LeaderCountMetricName, () => leaderPartitionsIterator.size)
// Visible for testing
Expand Down Expand Up @@ -1723,8 +1723,8 @@ class ReplicaManager(val config: KafkaConfig,
metadata => findPreferredReadReplica(partition, metadata, params.replicaId, fetchInfo.fetchOffset, fetchTimeMs))

if (preferredReadReplica.isDefined) {
replicaSelectorOpt.foreach { selector =>
debug(s"Replica selector ${selector.getClass.getSimpleName} returned preferred replica " +
replicaSelectorPlugin.foreach { selector =>
debug(s"Replica selector ${selector.get.getClass.getSimpleName} returned preferred replica " +
s"${preferredReadReplica.get} for ${params.clientMetadata}")
}
// If a preferred read-replica is set, skip the read
Expand Down Expand Up @@ -1890,7 +1890,7 @@ class ReplicaManager(val config: KafkaConfig,
if (FetchRequest.isValidBrokerId(replicaId))
None
else {
replicaSelectorOpt.flatMap { replicaSelector =>
replicaSelectorPlugin.flatMap { replicaSelector =>
val replicaEndpoints = metadataCache.getPartitionReplicaEndpoints(partition.topicPartition,
new ListenerName(clientMetadata.listenerName)).asScala
val replicaInfoSet = mutable.Set[ReplicaView]()
Expand Down Expand Up @@ -1921,7 +1921,7 @@ class ReplicaManager(val config: KafkaConfig,
replicaInfoSet.add(leaderReplica)

val partitionInfo = new DefaultPartitionView(replicaInfoSet.asJava, leaderReplica)
replicaSelector.select(partition.topicPartition, clientMetadata, partitionInfo).toScala.collect {
replicaSelector.get.select(partition.topicPartition, clientMetadata, partitionInfo).toScala.collect {
// Even though the replica selector can return the leader, we don't want to send it out with the
// FetchResponse, so we exclude it here
case selected if !selected.endpoint.isEmpty && selected != leaderReplica => selected.endpoint.id
Expand Down Expand Up @@ -2565,7 +2565,7 @@ class ReplicaManager(val config: KafkaConfig,
delayedShareFetchPurgatory.shutdown()
if (checkpointHW)
checkpointHighWatermarks()
replicaSelectorOpt.foreach(_.close)
replicaSelectorPlugin.foreach(_.close)
removeAllTopicMetrics()
addPartitionsToTxnManager.foreach(_.shutdown())
info("Shut down completely")
Expand All @@ -2587,11 +2587,11 @@ class ReplicaManager(val config: KafkaConfig,
new ReplicaAlterLogDirsManager(config, this, quotaManager, brokerTopicStats, directoryEventHandler)
}

private def createReplicaSelector(): Option[ReplicaSelector] = {
private def createReplicaSelector(metrics: Metrics): Option[Plugin[ReplicaSelector]] = {
config.replicaSelectorClassName.map { className =>
val tmpReplicaSelector: ReplicaSelector = Utils.newInstance(className, classOf[ReplicaSelector])
tmpReplicaSelector.configure(config.originals())
tmpReplicaSelector
Plugin.wrapInstance(tmpReplicaSelector, metrics, className)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just noticed that this is not correct. Instead of className, the second argument should be the name of the configuration: REPLICA_SELECTOR_CLASS_CONFIG.

I fixed that in my PR adding KIP-877 support to the authorizer: #19050

}
}

Expand Down
29 changes: 26 additions & 3 deletions core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ import org.apache.kafka.common.message.{DeleteRecordsResponseData, ShareFetchRes
import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset
import org.apache.kafka.common.metadata.{PartitionChangeRecord, PartitionRecord, RemoveTopicRecord, TopicRecord}
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.metrics.Monitorable
import org.apache.kafka.common.metrics.PluginMetrics
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.record._
Expand Down Expand Up @@ -1660,7 +1662,7 @@ class ReplicaManagerTest {

// PartitionView passed to ReplicaSelector should not contain the follower as it's not in the ISR
val expectedReplicaViews = Set(new DefaultReplicaView(leaderNode, 1, 0))
val partitionView = replicaManager.replicaSelectorOpt.get
val partitionView = replicaManager.replicaSelectorPlugin.get.get
.asInstanceOf[MockReplicaSelector].getPartitionViewArgument

assertTrue(partitionView.isDefined)
Expand Down Expand Up @@ -1709,7 +1711,7 @@ class ReplicaManagerTest {
assertTrue(consumerResult.hasFired)

// Expect not run the preferred read replica selection
assertEquals(0, replicaManager.replicaSelectorOpt.get.asInstanceOf[MockReplicaSelector].getSelectionCount)
assertEquals(0, replicaManager.replicaSelectorPlugin.get.get.asInstanceOf[MockReplicaSelector].getSelectionCount)

// Only leader will compute preferred replica
assertTrue(!consumerResult.assertFired.preferredReadReplica.isPresent)
Expand Down Expand Up @@ -1893,7 +1895,7 @@ class ReplicaManagerTest {
topicPartition, leaderEpoch + leaderEpochIncrement, followerBrokerId,
leaderBrokerId, countDownLatch, expectTruncation = true)
try {
assertFalse(replicaManager.replicaSelectorOpt.isDefined)
assertFalse(replicaManager.replicaSelectorPlugin.isDefined)
} finally {
replicaManager.shutdown(checkpointHW = false)
}
Expand Down Expand Up @@ -6132,6 +6134,18 @@ class ReplicaManagerTest {

}


@Test
def testMonitorableReplicaSelector(): Unit = {
val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time),
propsModifier = props => props.put(ReplicationConfigs.REPLICA_SELECTOR_CLASS_CONFIG, classOf[MonitorableReplicaSelector].getName))
try {
assertTrue(replicaManager.replicaSelectorPlugin.get.get.asInstanceOf[MonitorableReplicaSelector].pluginMetrics)
} finally {
replicaManager.shutdown(checkpointHW = false)
}
}

private def readFromLogWithOffsetOutOfRange(tp: TopicPartition): Seq[(TopicIdPartition, LogReadResult)] = {
val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), aliveBrokerIds = Seq(0, 1, 2), enableRemoteStorage = true, shouldMockLog = true)
try {
Expand Down Expand Up @@ -6211,3 +6225,12 @@ class MockReplicaSelector extends ReplicaSelector {
Optional.of(partitionView.leader)
}
}


class MonitorableReplicaSelector extends MockReplicaSelector with Monitorable {
var pluginMetrics = false

override def withPluginMetrics(metrics: PluginMetrics): Unit = {
pluginMetrics = true
}
}