Skip to content

Commit 79fe130

Browse files
authored
KAFKA-18893: Add KIP-877 support to ReplicaSelector (#19064)
ReplicaSelector implementations can implement Monitorable to register their own metrics. Reviewers: Mickael Maison <[email protected]>, Ken Huang <[email protected]>
1 parent 8fa3856 commit 79fe130

File tree

3 files changed

+38
-12
lines changed

3 files changed

+38
-12
lines changed

clients/src/main/java/org/apache/kafka/common/replica/ReplicaSelector.java

+3
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,9 @@
2727
/**
2828
* Plug-able interface for selecting a preferred read replica given the current set of replicas for a partition
2929
* and metadata from the client.
30+
* Implement {@link org.apache.kafka.common.metrics.Monitorable} to enable the selector to register metrics.
31+
* The following tags are automatically added to all metrics registered: <code>config</code> set to
32+
* <code>replica.selector.class</code>, and <code>class</code> set to the ReplicaSelector class name.
3033
*/
3134
public interface ReplicaSelector extends Configurable, Closeable {
3235

core/src/main/scala/kafka/server/ReplicaManager.scala

+9-9
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ import kafka.server.ReplicaManager.{AtMinIsrPartitionCountMetricName, FailedIsrU
2727
import kafka.server.share.DelayedShareFetch
2828
import kafka.utils._
2929
import org.apache.kafka.common.errors._
30-
import org.apache.kafka.common.internals.Topic
30+
import org.apache.kafka.common.internals.{Plugin, Topic}
3131
import org.apache.kafka.common.message.DeleteRecordsResponseData.DeleteRecordsPartitionResult
3232
import org.apache.kafka.common.message.DescribeLogDirsResponseData.DescribeLogDirsTopic
3333
import org.apache.kafka.common.message.ListOffsetsRequestData.{ListOffsetsPartition, ListOffsetsTopic}
@@ -338,7 +338,7 @@ class ReplicaManager(val config: KafkaConfig,
338338
}
339339

340340
// Visible for testing
341-
private[server] val replicaSelectorOpt: Option[ReplicaSelector] = createReplicaSelector()
341+
private[server] val replicaSelectorPlugin: Option[Plugin[ReplicaSelector]] = createReplicaSelector(metrics)
342342

343343
metricsGroup.newGauge(LeaderCountMetricName, () => leaderPartitionsIterator.size)
344344
// Visible for testing
@@ -1723,8 +1723,8 @@ class ReplicaManager(val config: KafkaConfig,
17231723
metadata => findPreferredReadReplica(partition, metadata, params.replicaId, fetchInfo.fetchOffset, fetchTimeMs))
17241724

17251725
if (preferredReadReplica.isDefined) {
1726-
replicaSelectorOpt.foreach { selector =>
1727-
debug(s"Replica selector ${selector.getClass.getSimpleName} returned preferred replica " +
1726+
replicaSelectorPlugin.foreach { selector =>
1727+
debug(s"Replica selector ${selector.get.getClass.getSimpleName} returned preferred replica " +
17281728
s"${preferredReadReplica.get} for ${params.clientMetadata}")
17291729
}
17301730
// If a preferred read-replica is set, skip the read
@@ -1890,7 +1890,7 @@ class ReplicaManager(val config: KafkaConfig,
18901890
if (FetchRequest.isValidBrokerId(replicaId))
18911891
None
18921892
else {
1893-
replicaSelectorOpt.flatMap { replicaSelector =>
1893+
replicaSelectorPlugin.flatMap { replicaSelector =>
18941894
val replicaEndpoints = metadataCache.getPartitionReplicaEndpoints(partition.topicPartition,
18951895
new ListenerName(clientMetadata.listenerName)).asScala
18961896
val replicaInfoSet = mutable.Set[ReplicaView]()
@@ -1921,7 +1921,7 @@ class ReplicaManager(val config: KafkaConfig,
19211921
replicaInfoSet.add(leaderReplica)
19221922

19231923
val partitionInfo = new DefaultPartitionView(replicaInfoSet.asJava, leaderReplica)
1924-
replicaSelector.select(partition.topicPartition, clientMetadata, partitionInfo).toScala.collect {
1924+
replicaSelector.get.select(partition.topicPartition, clientMetadata, partitionInfo).toScala.collect {
19251925
// Even though the replica selector can return the leader, we don't want to send it out with the
19261926
// FetchResponse, so we exclude it here
19271927
case selected if !selected.endpoint.isEmpty && selected != leaderReplica => selected.endpoint.id
@@ -2565,7 +2565,7 @@ class ReplicaManager(val config: KafkaConfig,
25652565
delayedShareFetchPurgatory.shutdown()
25662566
if (checkpointHW)
25672567
checkpointHighWatermarks()
2568-
replicaSelectorOpt.foreach(_.close)
2568+
replicaSelectorPlugin.foreach(_.close)
25692569
removeAllTopicMetrics()
25702570
addPartitionsToTxnManager.foreach(_.shutdown())
25712571
info("Shut down completely")
@@ -2587,11 +2587,11 @@ class ReplicaManager(val config: KafkaConfig,
25872587
new ReplicaAlterLogDirsManager(config, this, quotaManager, brokerTopicStats, directoryEventHandler)
25882588
}
25892589

2590-
private def createReplicaSelector(): Option[ReplicaSelector] = {
2590+
private def createReplicaSelector(metrics: Metrics): Option[Plugin[ReplicaSelector]] = {
25912591
config.replicaSelectorClassName.map { className =>
25922592
val tmpReplicaSelector: ReplicaSelector = Utils.newInstance(className, classOf[ReplicaSelector])
25932593
tmpReplicaSelector.configure(config.originals())
2594-
tmpReplicaSelector
2594+
Plugin.wrapInstance(tmpReplicaSelector, metrics, className)
25952595
}
25962596
}
25972597

core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala

+26-3
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,8 @@ import org.apache.kafka.common.message.{DeleteRecordsResponseData, ShareFetchRes
4040
import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset
4141
import org.apache.kafka.common.metadata.{PartitionChangeRecord, PartitionRecord, RemoveTopicRecord, TopicRecord}
4242
import org.apache.kafka.common.metrics.Metrics
43+
import org.apache.kafka.common.metrics.Monitorable
44+
import org.apache.kafka.common.metrics.PluginMetrics
4345
import org.apache.kafka.common.network.ListenerName
4446
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
4547
import org.apache.kafka.common.record._
@@ -1660,7 +1662,7 @@ class ReplicaManagerTest {
16601662

16611663
// PartitionView passed to ReplicaSelector should not contain the follower as it's not in the ISR
16621664
val expectedReplicaViews = Set(new DefaultReplicaView(leaderNode, 1, 0))
1663-
val partitionView = replicaManager.replicaSelectorOpt.get
1665+
val partitionView = replicaManager.replicaSelectorPlugin.get.get
16641666
.asInstanceOf[MockReplicaSelector].getPartitionViewArgument
16651667

16661668
assertTrue(partitionView.isDefined)
@@ -1709,7 +1711,7 @@ class ReplicaManagerTest {
17091711
assertTrue(consumerResult.hasFired)
17101712

17111713
// Expect not run the preferred read replica selection
1712-
assertEquals(0, replicaManager.replicaSelectorOpt.get.asInstanceOf[MockReplicaSelector].getSelectionCount)
1714+
assertEquals(0, replicaManager.replicaSelectorPlugin.get.get.asInstanceOf[MockReplicaSelector].getSelectionCount)
17131715

17141716
// Only leader will compute preferred replica
17151717
assertTrue(!consumerResult.assertFired.preferredReadReplica.isPresent)
@@ -1893,7 +1895,7 @@ class ReplicaManagerTest {
18931895
topicPartition, leaderEpoch + leaderEpochIncrement, followerBrokerId,
18941896
leaderBrokerId, countDownLatch, expectTruncation = true)
18951897
try {
1896-
assertFalse(replicaManager.replicaSelectorOpt.isDefined)
1898+
assertFalse(replicaManager.replicaSelectorPlugin.isDefined)
18971899
} finally {
18981900
replicaManager.shutdown(checkpointHW = false)
18991901
}
@@ -6132,6 +6134,18 @@ class ReplicaManagerTest {
61326134

61336135
}
61346136

6137+
6138+
@Test
6139+
def testMonitorableReplicaSelector(): Unit = {
6140+
val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time),
6141+
propsModifier = props => props.put(ReplicationConfigs.REPLICA_SELECTOR_CLASS_CONFIG, classOf[MonitorableReplicaSelector].getName))
6142+
try {
6143+
assertTrue(replicaManager.replicaSelectorPlugin.get.get.asInstanceOf[MonitorableReplicaSelector].pluginMetrics)
6144+
} finally {
6145+
replicaManager.shutdown(checkpointHW = false)
6146+
}
6147+
}
6148+
61356149
private def readFromLogWithOffsetOutOfRange(tp: TopicPartition): Seq[(TopicIdPartition, LogReadResult)] = {
61366150
val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), aliveBrokerIds = Seq(0, 1, 2), enableRemoteStorage = true, shouldMockLog = true)
61376151
try {
@@ -6211,3 +6225,12 @@ class MockReplicaSelector extends ReplicaSelector {
62116225
Optional.of(partitionView.leader)
62126226
}
62136227
}
6228+
6229+
6230+
class MonitorableReplicaSelector extends MockReplicaSelector with Monitorable {
6231+
var pluginMetrics = false
6232+
6233+
override def withPluginMetrics(metrics: PluginMetrics): Unit = {
6234+
pluginMetrics = true
6235+
}
6236+
}

0 commit comments

Comments
 (0)