Skip to content

Commit bf68f33

Browse files
committed
Address review comments
1 parent 4660b02 commit bf68f33

File tree

2 files changed

+12
-20
lines changed

2 files changed

+12
-20
lines changed

streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java

Lines changed: 8 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
import org.apache.kafka.clients.consumer.KafkaConsumer;
2525
import org.apache.kafka.clients.producer.KafkaProducer;
2626
import org.apache.kafka.clients.producer.Producer;
27-
import org.apache.kafka.clients.producer.ProducerConfig;
2827
import org.apache.kafka.common.Metric;
2928
import org.apache.kafka.common.MetricName;
3029
import org.apache.kafka.common.Uuid;
@@ -42,7 +41,6 @@
4241
import org.apache.kafka.streams.ClientInstanceIds;
4342
import org.apache.kafka.streams.KafkaClientSupplier;
4443
import org.apache.kafka.streams.KafkaStreams;
45-
import org.apache.kafka.streams.KeyValue;
4644
import org.apache.kafka.streams.StreamsBuilder;
4745
import org.apache.kafka.streams.StreamsConfig;
4846
import org.apache.kafka.streams.Topology;
@@ -167,10 +165,6 @@ public void tearDown() throws Exception {
167165
public void shouldPushGlobalThreadMetricsToBroker(final String recordingLevel) throws Exception {
168166
streamsApplicationProperties = props(true);
169167
streamsApplicationProperties.put(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, recordingLevel);
170-
IntegrationTestUtils.produceKeyValuesSynchronously(globalStoreTopic,
171-
List.of(KeyValue.pair("1", "one"), KeyValue.pair("2", "two"), KeyValue.pair("3", "three")),
172-
producerProperties(),
173-
cluster.time);
174168
final Topology topology = simpleTopology(true);
175169
subscribeForStreamsMetrics();
176170
try (final KafkaStreams streams = new KafkaStreams(topology, streamsApplicationProperties)) {
@@ -449,14 +443,6 @@ private static Stream<Arguments> multiTaskParameters() {
449443
Arguments.of(false));
450444
}
451445

452-
private Properties producerProperties() {
453-
final Properties properties = new Properties();
454-
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers());
455-
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, Serdes.String().serializer().getClass());
456-
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, Serdes.String().serializer().getClass());
457-
return properties;
458-
}
459-
460446
private Properties props(final boolean stateUpdaterEnabled) {
461447
return props(mkObjectProperties(mkMap(mkEntry(StreamsConfig.InternalConfig.STATE_UPDATER_ENABLED, stateUpdaterEnabled))));
462448
}
@@ -486,7 +472,8 @@ private Topology complexTopology() {
486472
}
487473

488474
private void addGlobalStore(final StreamsBuilder builder) {
489-
builder.addGlobalStore(Stores.keyValueStoreBuilder(
475+
builder.addGlobalStore(
476+
Stores.keyValueStoreBuilder(
490477
Stores.inMemoryKeyValueStore("iq-test-store"),
491478
Serdes.String(),
492479
Serdes.String()
@@ -496,16 +483,20 @@ private void addGlobalStore(final StreamsBuilder builder) {
496483
() -> new Processor<>() {
497484
private KeyValueStore<String, String> store;
498485

486+
// The store iterator is intentionally not closed here as it needs
487+
// to be open during the test, so the Streams app will emit the
488+
// org.apache.kafka.stream.state.oldest.iterator.open.since.ms metric
489+
// that is expected. So the globalStoreIterator is a global variable
490+
// (pun not intended), so it can be closed in the tearDown method.
499491
@Override
500492
public void init(final ProcessorContext<Void, Void> context) {
501493
store = context.getStateStore("iq-test-store");
494+
globalStoreIterator = store.all();
502495
}
503496

504497
@Override
505498
public void process(final Record<String, String> record) {
506499
store.put(record.key(), record.value());
507-
globalStoreIterator = store.all();
508-
globalStoreIterator.forEachRemaining(kv -> System.out.printf("key %s value %s%n", kv.key, kv.value));
509500
}
510501
});
511502
}

streams/src/main/java/org/apache/kafka/streams/internals/metrics/StreamsThreadMetricsDelegatingReporter.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,13 +35,13 @@ public class StreamsThreadMetricsDelegatingReporter implements MetricsReporter {
3535
private static final String THREAD_ID_TAG = "thread-id";
3636
private final Consumer<byte[], byte[]> consumer;
3737
private final String threadId;
38-
private final String stateUpdaterThreadId;
38+
private final Optional<String> stateUpdaterThreadId;
3939

4040

4141
public StreamsThreadMetricsDelegatingReporter(final Consumer<byte[], byte[]> consumer, final String threadId, final Optional<String> stateUpdaterThreadId) {
4242
this.consumer = Objects.requireNonNull(consumer);
4343
this.threadId = Objects.requireNonNull(threadId);
44-
this.stateUpdaterThreadId = stateUpdaterThreadId.orElse("");
44+
this.stateUpdaterThreadId = stateUpdaterThreadId;
4545
log.debug("Creating MetricsReporter for threadId {} and stateUpdaterId {}", threadId, stateUpdaterThreadId);
4646
}
4747

@@ -60,7 +60,8 @@ public void metricChange(final KafkaMetric metric) {
6060

6161
private boolean tagMatchStreamOrStateUpdaterThreadId(final KafkaMetric metric) {
6262
final Map<String, String> tags = metric.metricName().tags();
63-
final boolean shouldInclude = tags.containsKey(THREAD_ID_TAG) && (tags.get(THREAD_ID_TAG).equals(threadId) || tags.get(THREAD_ID_TAG).equals(stateUpdaterThreadId));
63+
final boolean shouldInclude = tags.containsKey(THREAD_ID_TAG) && (tags.get(THREAD_ID_TAG).equals(threadId) ||
64+
Optional.of(tags.get(THREAD_ID_TAG)).equals(stateUpdaterThreadId));
6465
if (!shouldInclude) {
6566
log.trace("Rejecting metric {}", metric.metricName());
6667
}

0 commit comments

Comments
 (0)