-
Notifications
You must be signed in to change notification settings - Fork 14.3k
KAFKA-18826: Add global thread metrics #18953
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
@@ -144,6 +157,51 @@ public void tearDown() throws Exception { | |||
if (!streamsSecondApplicationProperties.isEmpty()) { | |||
IntegrationTestUtils.purgeLocalStreamsState(streamsSecondApplicationProperties); | |||
} | |||
if (globalStoreIterator != null) { | |||
globalStoreIterator.close(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Required from test - explaination below adding the global store
|
||
@ParameterizedTest | ||
@ValueSource(strings = {"INFO", "DEBUG", "TRACE"}) | ||
public void shouldPushGlobalThreadMetricsToBroker(final String recordingLevel) throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
New test for validating broker plugin emits the global thread metrics. I thought of updating the existing broker plugin test, but it's busy enough as is.
|
||
try (final KafkaStreams streams = new KafkaStreams(topology, streamsApplicationProperties)) { | ||
IntegrationTestUtils.startApplicationAndWaitUntilRunning(streams); | ||
|
||
final List<MetricName> streamsThreadMetrics = streams.metrics().values().stream().map(Metric::metricName) | ||
.filter(metricName -> metricName.tags().containsKey("thread-id")).collect(Collectors.toList()); | ||
.filter(metricName -> metricName.tags().containsKey("thread-id")).toList(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
side cleanup
|
||
final List<MetricName> streamsClientMetrics = streams.metrics().values().stream().map(Metric::metricName) | ||
.filter(metricName -> metricName.group().equals("stream-metrics")).collect(Collectors.toList()); | ||
.filter(metricName -> metricName.group().equals("stream-metrics")).toList(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
side cleanup
final List<MetricName> consumerPassedStreamThreadMetricNames = INTERCEPTING_CONSUMERS.get(FIRST_INSTANCE_CLIENT).passedMetrics.stream().map(KafkaMetric::metricName).collect(Collectors.toList()); | ||
final List<MetricName> adminPassedStreamClientMetricNames = INTERCEPTING_ADMIN_CLIENTS.get(FIRST_INSTANCE_CLIENT).passedMetrics.stream().map(KafkaMetric::metricName).collect(Collectors.toList()); | ||
final List<MetricName> consumerPassedStreamThreadMetricNames = INTERCEPTING_CONSUMERS.get(FIRST_INSTANCE_CLIENT).passedMetrics.stream().map(KafkaMetric::metricName).toList(); | ||
final List<MetricName> adminPassedStreamClientMetricNames = INTERCEPTING_ADMIN_CLIENTS.get(FIRST_INSTANCE_CLIENT).passedMetrics.stream().map(KafkaMetric::metricName).toList(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
side cleanup
@@ -350,10 +408,10 @@ public void passedMetricsShouldNotLeakIntoClientMetrics() throws Exception { | |||
IntegrationTestUtils.startApplicationAndWaitUntilRunning(streams); | |||
|
|||
final List<MetricName> streamsThreadMetrics = streams.metrics().values().stream().map(Metric::metricName) | |||
.filter(metricName -> metricName.tags().containsKey("thread-id")).collect(Collectors.toList()); | |||
.filter(metricName -> metricName.tags().containsKey("thread-id")).toList(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
side cleanup here and the next
@@ -525,7 +619,7 @@ public void exportMetrics(final AuthorizableRequestContext context, final Client | |||
.stream() | |||
.flatMap(rm -> rm.getScopeMetricsList().stream()) | |||
.flatMap(sm -> sm.getMetricsList().stream()) | |||
.map(metric -> metric.getGauge()) | |||
.map(org.apache.kafka.shaded.io.opentelemetry.proto.metrics.v1.Metric::getGauge) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
side cleanup as pointed out by IntelliJ not sure using a method handle here and below is better than using a lambda in this case due to needing the fully qualified name
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The change LGTM. Little bit annoying that we have naming "conflict" for Metric
but using method reference seems better.
@@ -37,10 +38,10 @@ public class StreamsThreadMetricsDelegatingReporter implements MetricsReporter { | |||
private final String stateUpdaterThreadId; | |||
|
|||
|
|||
public StreamsThreadMetricsDelegatingReporter(final Consumer<byte[], byte[]> consumer, final String threadId, final String stateUpdaterThreadId) { | |||
public StreamsThreadMetricsDelegatingReporter(final Consumer<byte[], byte[]> consumer, final String threadId, final Optional<String> stateUpdaterThreadId) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since the reporting process is exactly the same for the global thread as a stream thread, IMHO it made sense to re-use the existing reporter class. But it seems better to have the stateUpdaterThreadId
represented as an Optional
to signal that it may not be present
@Override | ||
public void process(final Record<String, String> record) { | ||
store.put(record.key(), record.value()); | ||
globalStoreIterator = store.all(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The store iterator is intentionally not closed here as it needs to be open during the test so the Streams app will emit the org.apache.kafka.stream.state.oldest.iterator.open.since.ms
metric that is expected. So the globalStoreIterator
is a global variable (pun not intended) so it can be closed in the tearDown
method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Uff... quick a hack. But I guess difficult (impossible) to do differently?
We should add a comment about it though, ie, just c&p your PR comment directly (it's a good explanation, and avoids that somebody spends time tryin to "fix" it, just breaking the test and getting confused, and wasting time).
We might also want to add a @SuppressWarning("resource")
to process(...)
method? IntelliJ should complain about not using try-with-resource for the iterator?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Regarding @SuppressWarning
I wasn't seeing one ,so I didn't add it. If you'd prefer to have on anyway I'll do so.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems my IntelliJ warn setting are different. Should be ok w/o it, too.
this.consumer = Objects.requireNonNull(consumer); | ||
this.threadId = Objects.requireNonNull(threadId); | ||
this.stateUpdaterThreadId = Objects.requireNonNull(stateUpdaterThreadId); | ||
this.stateUpdaterThreadId = stateUpdaterThreadId.orElse(""); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thinking this should be stateUpdaterThreadId.orElse(null);
instead - thoughts?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure. Might actually be cleaner to make this.stateUpdaterThreadId
and Optional
by itself, and update the code accordingly when it's used?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall LGTM. A few minor comments/questions.
this.consumer = Objects.requireNonNull(consumer); | ||
this.threadId = Objects.requireNonNull(threadId); | ||
this.stateUpdaterThreadId = Objects.requireNonNull(stateUpdaterThreadId); | ||
this.stateUpdaterThreadId = stateUpdaterThreadId.orElse(""); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure. Might actually be cleaner to make this.stateUpdaterThreadId
and Optional
by itself, and update the code accordingly when it's used?
@@ -525,7 +619,7 @@ public void exportMetrics(final AuthorizableRequestContext context, final Client | |||
.stream() | |||
.flatMap(rm -> rm.getScopeMetricsList().stream()) | |||
.flatMap(sm -> sm.getMetricsList().stream()) | |||
.map(metric -> metric.getGauge()) | |||
.map(org.apache.kafka.shaded.io.opentelemetry.proto.metrics.v1.Metric::getGauge) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The change LGTM. Little bit annoying that we have naming "conflict" for Metric
but using method reference seems better.
Stores.inMemoryKeyValueStore("iq-test-store"), | ||
Serdes.String(), | ||
Serdes.String() | ||
), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Formatting seems to be off? I would recommend
builder.addGlobalStore(
Stores.keyValueStoreBuilder(
Stores.inMemoryKeyValueStore("iq-test-store"),
Serdes.String(),
Serdes.String()
),
globalStoreTopic,
[...]
):
@Override | ||
public void process(final Record<String, String> record) { | ||
store.put(record.key(), record.value()); | ||
globalStoreIterator = store.all(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Uff... quick a hack. But I guess difficult (impossible) to do differently?
We should add a comment about it though, ie, just c&p your PR comment directly (it's a good explanation, and avoids that somebody spends time tryin to "fix" it, just breaking the test and getting confused, and wasting time).
We might also want to add a @SuppressWarning("resource")
to process(...)
method? IntelliJ should complain about not using try-with-resource for the iterator?
...src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java
Outdated
Show resolved
Hide resolved
441faac
to
d77afd8
Compare
@mjsax comments addressed |
.../java/org/apache/kafka/streams/internals/metrics/StreamsThreadMetricsDelegatingReporter.java
Outdated
Show resolved
Hide resolved
...src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java
Outdated
Show resolved
Hide resolved
...src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java
Outdated
Show resolved
Hide resolved
...src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java
Outdated
Show resolved
Hide resolved
Integrates global thread-specific metrics reporting via `StreamsThreadMetricsDelegatingReporter`. Includes unit and integration tests to validate telemetry push for global stores. Refactors topology creation to conditionally include global stores for testing purposes.
d77afd8
to
a9ea85e
Compare
@mjsax comments addressed |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks Bill. LGTM.
Merged #18953 into trunk |
When implementing [KIP-1091](https://cwiki.apache.org/confluence/display/KAFKA/KIP-1091%3A+Improved+Kafka+Streams+operator+metrics) metrics for the Global Stream thread was overlooked. This ticket adds the Global Thread metrics so they are available via the KIP-1076 process of adding external Kafka metrics. The existing integration test has been updated to confirm GlobalThread metrics are sent via the broker plugin. Reviewers: Matthias Sax <[email protected]>
When implementing KIP-1091 metrics for the Global Stream thread was overlooked. This ticket adds the Global Thread metrics so they are available via the KIP-1076 process of adding external Kafka metrics.
The existing integration test has been updated to confirm GlobalThread metrics are sent via the broker plugin.