Skip to content

Commit a9ea85e

Browse files
committed
Updates per comments
1 parent bf68f33 commit a9ea85e

File tree

2 files changed

+5
-5
lines changed

2 files changed

+5
-5
lines changed

streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java

+4-4
Original file line numberDiff line numberDiff line change
@@ -471,6 +471,7 @@ private Topology complexTopology() {
471471
return builder.build();
472472
}
473473

474+
474475
private void addGlobalStore(final StreamsBuilder builder) {
475476
builder.addGlobalStore(
476477
Stores.keyValueStoreBuilder(
@@ -481,22 +482,21 @@ private void addGlobalStore(final StreamsBuilder builder) {
481482
globalStoreTopic,
482483
Consumed.with(Serdes.String(), Serdes.String()),
483484
() -> new Processor<>() {
484-
private KeyValueStore<String, String> store;
485485

486486
// The store iterator is intentionally not closed here as it needs
487487
// to be open during the test, so the Streams app will emit the
488488
// org.apache.kafka.stream.state.oldest.iterator.open.since.ms metric
489489
// that is expected. So the globalStoreIterator is a global variable
490490
// (pun not intended), so it can be closed in the tearDown method.
491+
@SuppressWarnings("unchecked")
491492
@Override
492493
public void init(final ProcessorContext<Void, Void> context) {
493-
store = context.getStateStore("iq-test-store");
494-
globalStoreIterator = store.all();
494+
globalStoreIterator = ((KeyValueStore<String, String>)context.getStateStore("iq-test-store")).all();
495495
}
496496

497497
@Override
498498
public void process(final Record<String, String> record) {
499-
store.put(record.key(), record.value());
499+
// no-op
500500
}
501501
});
502502
}

streams/src/main/java/org/apache/kafka/streams/internals/metrics/StreamsThreadMetricsDelegatingReporter.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ public void metricChange(final KafkaMetric metric) {
6161
private boolean tagMatchStreamOrStateUpdaterThreadId(final KafkaMetric metric) {
6262
final Map<String, String> tags = metric.metricName().tags();
6363
final boolean shouldInclude = tags.containsKey(THREAD_ID_TAG) && (tags.get(THREAD_ID_TAG).equals(threadId) ||
64-
Optional.of(tags.get(THREAD_ID_TAG)).equals(stateUpdaterThreadId));
64+
Optional.ofNullable(tags.get(THREAD_ID_TAG)).equals(stateUpdaterThreadId));
6565
if (!shouldInclude) {
6666
log.trace("Rejecting metric {}", metric.metricName());
6767
}

0 commit comments

Comments
 (0)