-
Notifications
You must be signed in to change notification settings - Fork 14.3k
KAFKA-18893: Add KIP-877 support to ReplicaSelector #19064
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank @TaiJuWu this PR, left a comment
@@ -27,6 +27,9 @@ | |||
/** | |||
* Plug-able interface for selecting a preferred read replica given the current set of replicas for a partition | |||
* and metadata from the client. | |||
* Implement {@link org.apache.kafka.common.metrics.Monitorable} to enable the interceptor to register metrics. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
to enable the interceptor to register metrics.
should be to enable the selector to register metrics.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for catching it, fixed.
Converted to draft as per https://lists.apache.org/thread/hwrrztts7j8jwwkk740848hd6y8p2c0x Giving the community a few days to review the proposed changes. If no objections are raised, I'll mark it back as ready. |
Since no objections were raised and none of the proposed changes to KIP-877 impact |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR, I left a few comments
@@ -1723,7 +1723,7 @@ class ReplicaManager(val config: KafkaConfig, | |||
metadata => findPreferredReadReplica(partition, metadata, params.replicaId, fetchInfo.fetchOffset, fetchTimeMs)) | |||
|
|||
if (preferredReadReplica.isDefined) { | |||
replicaSelectorOpt.foreach { selector => | |||
replicaSelectorPlugin.foreach { selector => | |||
debug(s"Replica selector ${selector.getClass.getSimpleName} returned preferred replica " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This needs updating, now selector
is of type Plugin
here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure I understand is correct or not.
I changed the debug message to debug(s"Replica selector plugin ${selector.getClass.getSimpleName} returned preferred replica " + s"${preferredReadReplica.get} for ${params.clientMetadata}")
If I am misunderstanding , please correct me.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't it going to print Replica selector Plugin returned preferred replica
now?
Instead we want Replica selector RackAwareReplicaSelector returned preferred replica
so I think we need to call ${selector.get.getClass.getSimpleName}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, you are correct. Thanks for catching it.
f1d62cb is the fix.
All comments are addressed. Thanks for your review!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, thanks for the PR!
ReplicaSelector implementations can implement Monitorable to register their own metrics. Reviewers: Mickael Maison <[email protected]>, Ken Huang <[email protected]>
config.replicaSelectorClassName.map { className => | ||
val tmpReplicaSelector: ReplicaSelector = Utils.newInstance(className, classOf[ReplicaSelector]) | ||
tmpReplicaSelector.configure(config.originals()) | ||
tmpReplicaSelector | ||
Plugin.wrapInstance(tmpReplicaSelector, metrics, className) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just noticed that this is not correct. Instead of className
, the second argument should be the name of the configuration: REPLICA_SELECTOR_CLASS_CONFIG
.
I fixed that in my PR adding KIP-877 support to the authorizer: #19050
ReplicaSelector implementations can implement Monitorable to register their own metrics.
Reviewers: Mickael Maison [email protected], Ken Huang [email protected]