-
Notifications
You must be signed in to change notification settings - Fork 14.4k
KAFKA-19376: Throw an error message if any unsupported feature is used with KIP-1071 #19908
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: trunk
Are you sure you want to change the base?
Conversation
@aliehsaeedii Please take a look |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR introduces additional validations for the STREAMS protocol by throwing errors or logging warnings when unsupported configuration features are used.
- Tests are added to verify that unsupported features (pattern subscriptions, warmup replicas, static membership, non-default client supplier) produce the correct error messages or warnings.
- Internal code in StreamsConfig and KafkaStreams now enforce these validations, and a redundant log message is removed from StreamThread.
Reviewed Changes
Copilot reviewed 5 out of 5 changed files in this pull request and generated 1 comment.
Show a summary per file
File | Description |
---|---|
streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java | Added tests to validate warning logs and exception throwing for unsupported features with STREAMS protocol. |
streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java | Added tests to ensure that using pattern subscriptions and non-default client supplier with STREAMS protocol throws exceptions. |
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java | Removed redundant logging of STREAMS protocol activation. |
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java | Implemented compatibility verification for static membership, warmup replicas, and standby replicas when using STREAMS protocol. |
streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java | Introduced additional unsupported feature checks and error throwing for named topologies, pattern subscriptions, and non-default kafka client supplier. |
@@ -1047,6 +1051,21 @@ private KafkaStreams(final TopologyMetadata topologyMetadata, | |||
rocksDBMetricsRecordingService = maybeCreateRocksDBMetricsRecordingService(clientId, applicationConfigs); | |||
} | |||
|
|||
private void throwIfUnsupportedFeatureIsUsedWithStreamsRebalanceProtocol() { | |||
if (applicationConfigs.getString(StreamsConfig.GROUP_PROTOCOL_CONFIG).equalsIgnoreCase(GroupProtocol.STREAMS.name)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider invoking GroupProtocol.STREAMS.name() (with parentheses) instead of GroupProtocol.STREAMS.name to correctly compare the protocol string.
if (applicationConfigs.getString(StreamsConfig.GROUP_PROTOCOL_CONFIG).equalsIgnoreCase(GroupProtocol.STREAMS.name)) { | |
if (applicationConfigs.getString(StreamsConfig.GROUP_PROTOCOL_CONFIG).equalsIgnoreCase(GroupProtocol.STREAMS.name())) { |
Copilot uses AI. Check for mistakes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR @lucasbru - overall this LGTM but the changes here cause the KafkaStreamsTelemetryIntegrationTest
test to fail as it uses its own client supplier to get access to the passed metrics.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@lucasbru thanks for this patch. Two small questions remain. PTAL
|
||
private void verifyStreamsProtocolCompatibility(final boolean doLog) { | ||
if (doLog && getString(GROUP_PROTOCOL_CONFIG).equals(GroupProtocol.STREAMS.name().toLowerCase(Locale.ROOT))) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should equals
be replaced by equalsIgnoreCase
, since the other checks, such as throwIfUnsupportedFeatureIsUsedWithStreamsRebalanceProtocol
and checkUnsupportedConfigsPostProcess
, use equalsIgnoreCase
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. I actually added a little protected method to check if streams protocol is enabled, to eliminate this source of bugs.
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "test-app"); | ||
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:9092"); | ||
props.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, "streams"); | ||
props.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, "static-member-1"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we test the static id with consumer prefix?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. Updated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@@ -1047,6 +1051,21 @@ private KafkaStreams(final TopologyMetadata topologyMetadata, | |||
rocksDBMetricsRecordingService = maybeCreateRocksDBMetricsRecordingService(clientId, applicationConfigs); | |||
} | |||
|
|||
private void throwIfUnsupportedFeatureIsUsedWithStreamsRebalanceProtocol() { | |||
if (applicationConfigs.getString(StreamsConfig.GROUP_PROTOCOL_CONFIG).equalsIgnoreCase(GroupProtocol.STREAMS.name)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
|
||
private void verifyStreamsProtocolCompatibility(final boolean doLog) { | ||
if (doLog && getString(GROUP_PROTOCOL_CONFIG).equals(GroupProtocol.STREAMS.name().toLowerCase(Locale.ROOT))) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. I actually added a little protected method to check if streams protocol is enabled, to eliminate this source of bugs.
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "test-app"); | ||
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:9092"); | ||
props.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, "streams"); | ||
props.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, "static-member-1"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. Updated
We should be mindful of ours users and let them know early if they are
using an unsupported feature in 4.1.
Unsupported features: