48
48
import org .apache .kafka .streams .integration .utils .IntegrationTestUtils ;
49
49
import org .apache .kafka .streams .kstream .Consumed ;
50
50
import org .apache .kafka .streams .kstream .Produced ;
51
+ import org .apache .kafka .streams .processor .api .Processor ;
52
+ import org .apache .kafka .streams .processor .api .ProcessorContext ;
53
+ import org .apache .kafka .streams .processor .api .Record ;
54
+ import org .apache .kafka .streams .state .KeyValueIterator ;
55
+ import org .apache .kafka .streams .state .KeyValueStore ;
56
+ import org .apache .kafka .streams .state .Stores ;
51
57
import org .apache .kafka .test .TestUtils ;
52
58
import org .apache .kafka .tools .ClientMetricsCommand ;
53
59
@@ -98,8 +104,11 @@ public class KafkaStreamsTelemetryIntegrationTest {
98
104
private String outputTopicTwoPartitions ;
99
105
private String inputTopicOnePartition ;
100
106
private String outputTopicOnePartition ;
107
+ private String globalStoreTopic ;
108
+ private Uuid globalStoreConsumerInstanceId ;
101
109
private Properties streamsApplicationProperties = new Properties ();
102
110
private Properties streamsSecondApplicationProperties = new Properties ();
111
+ private KeyValueIterator <String , String > globalStoreIterator ;
103
112
104
113
private static EmbeddedKafkaCluster cluster ;
105
114
private static final List <TestingMetricsInterceptingConsumer <byte [], byte []>> INTERCEPTING_CONSUMERS = new ArrayList <>();
@@ -125,10 +134,12 @@ public void setUp(final TestInfo testInfo) throws InterruptedException {
125
134
outputTopicTwoPartitions = appId + "-output-two" ;
126
135
inputTopicOnePartition = appId + "-input-one" ;
127
136
outputTopicOnePartition = appId + "-output-one" ;
137
+ globalStoreTopic = appId + "-global-store" ;
128
138
cluster .createTopic (inputTopicTwoPartitions , 2 , 1 );
129
139
cluster .createTopic (outputTopicTwoPartitions , 2 , 1 );
130
140
cluster .createTopic (inputTopicOnePartition , 1 , 1 );
131
141
cluster .createTopic (outputTopicOnePartition , 1 , 1 );
142
+ cluster .createTopic (globalStoreTopic , 2 , 1 );
132
143
}
133
144
134
145
@ AfterAll
@@ -144,6 +155,47 @@ public void tearDown() throws Exception {
144
155
if (!streamsSecondApplicationProperties .isEmpty ()) {
145
156
IntegrationTestUtils .purgeLocalStreamsState (streamsSecondApplicationProperties );
146
157
}
158
+ if (globalStoreIterator != null ) {
159
+ globalStoreIterator .close ();
160
+ }
161
+ }
162
+
163
+ @ ParameterizedTest
164
+ @ ValueSource (strings = {"INFO" , "DEBUG" , "TRACE" })
165
+ public void shouldPushGlobalThreadMetricsToBroker (final String recordingLevel ) throws Exception {
166
+ streamsApplicationProperties = props (true );
167
+ streamsApplicationProperties .put (StreamsConfig .METRICS_RECORDING_LEVEL_CONFIG , recordingLevel );
168
+ final Topology topology = simpleTopology (true );
169
+ subscribeForStreamsMetrics ();
170
+ try (final KafkaStreams streams = new KafkaStreams (topology , streamsApplicationProperties )) {
171
+ IntegrationTestUtils .startApplicationAndWaitUntilRunning (streams );
172
+ final ClientInstanceIds clientInstanceIds = streams .clientInstanceIds (Duration .ofSeconds (60 ));
173
+ for (final Map .Entry <String , Uuid > instanceId : clientInstanceIds .consumerInstanceIds ().entrySet ()) {
174
+ final String instanceIdKey = instanceId .getKey ();
175
+ if (instanceIdKey .endsWith ("GlobalStreamThread-global-consumer" )) {
176
+ globalStoreConsumerInstanceId = instanceId .getValue ();
177
+ }
178
+ }
179
+
180
+ assertNotNull (globalStoreConsumerInstanceId );
181
+ LOG .info ("Global consumer instance id {}" , globalStoreConsumerInstanceId );
182
+ TestUtils .waitForCondition (
183
+ () -> !TelemetryPlugin .SUBSCRIBED_METRICS .getOrDefault (globalStoreConsumerInstanceId , Collections .emptyList ()).isEmpty (),
184
+ 30_000 ,
185
+ "Never received subscribed metrics"
186
+ );
187
+
188
+ final List <String > expectedGlobalMetrics = streams .metrics ().values ().stream ().map (Metric ::metricName )
189
+ .filter (metricName -> metricName .tags ().containsKey ("thread-id" ) &&
190
+ metricName .tags ().get ("thread-id" ).endsWith ("-GlobalStreamThread" )).map (mn -> {
191
+ final String name = mn .name ().replace ('-' , '.' );
192
+ final String group = mn .group ().replace ("-metrics" , "" ).replace ('-' , '.' );
193
+ return "org.apache.kafka." + group + "." + name ;
194
+ }).filter (name -> !name .equals ("org.apache.kafka.stream.thread.state" ))// telemetry reporter filters out string metrics
195
+ .sorted ().toList ();
196
+ final List <String > actualGlobalMetrics = new ArrayList <>(TelemetryPlugin .SUBSCRIBED_METRICS .get (globalStoreConsumerInstanceId ));
197
+ assertEquals (expectedGlobalMetrics , actualGlobalMetrics );
198
+ }
147
199
}
148
200
149
201
@ ParameterizedTest
@@ -152,7 +204,7 @@ public void shouldPushMetricsToBroker(final String recordingLevel) throws Except
152
204
// End-to-end test validating metrics pushed to broker
153
205
streamsApplicationProperties = props (true );
154
206
streamsApplicationProperties .put (StreamsConfig .METRICS_RECORDING_LEVEL_CONFIG , recordingLevel );
155
- final Topology topology = simpleTopology ();
207
+ final Topology topology = simpleTopology (false );
156
208
subscribeForStreamsMetrics ();
157
209
try (final KafkaStreams streams = new KafkaStreams (topology , streamsApplicationProperties )) {
158
210
IntegrationTestUtils .startApplicationAndWaitUntilRunning (streams );
@@ -215,21 +267,21 @@ public void shouldPushMetricsToBroker(final String recordingLevel) throws Except
215
267
public void shouldPassMetrics (final String topologyType , final boolean stateUpdaterEnabled ) throws Exception {
216
268
// Streams metrics should get passed to Admin and Consumer
217
269
streamsApplicationProperties = props (stateUpdaterEnabled );
218
- final Topology topology = topologyType .equals ("simple" ) ? simpleTopology () : complexTopology ();
270
+ final Topology topology = topologyType .equals ("simple" ) ? simpleTopology (false ) : complexTopology ();
219
271
220
272
try (final KafkaStreams streams = new KafkaStreams (topology , streamsApplicationProperties )) {
221
273
IntegrationTestUtils .startApplicationAndWaitUntilRunning (streams );
222
274
223
275
final List <MetricName > streamsThreadMetrics = streams .metrics ().values ().stream ().map (Metric ::metricName )
224
- .filter (metricName -> metricName .tags ().containsKey ("thread-id" )).collect ( Collectors . toList () );
276
+ .filter (metricName -> metricName .tags ().containsKey ("thread-id" )).toList ();
225
277
226
278
final List <MetricName > streamsClientMetrics = streams .metrics ().values ().stream ().map (Metric ::metricName )
227
- .filter (metricName -> metricName .group ().equals ("stream-metrics" )).collect ( Collectors . toList () );
279
+ .filter (metricName -> metricName .group ().equals ("stream-metrics" )).toList ();
228
280
229
281
230
282
231
- final List <MetricName > consumerPassedStreamThreadMetricNames = INTERCEPTING_CONSUMERS .get (FIRST_INSTANCE_CLIENT ).passedMetrics .stream ().map (KafkaMetric ::metricName ).collect ( Collectors . toList () );
232
- final List <MetricName > adminPassedStreamClientMetricNames = INTERCEPTING_ADMIN_CLIENTS .get (FIRST_INSTANCE_CLIENT ).passedMetrics .stream ().map (KafkaMetric ::metricName ).collect ( Collectors . toList () );
283
+ final List <MetricName > consumerPassedStreamThreadMetricNames = INTERCEPTING_CONSUMERS .get (FIRST_INSTANCE_CLIENT ).passedMetrics .stream ().map (KafkaMetric ::metricName ).toList ();
284
+ final List <MetricName > adminPassedStreamClientMetricNames = INTERCEPTING_ADMIN_CLIENTS .get (FIRST_INSTANCE_CLIENT ).passedMetrics .stream ().map (KafkaMetric ::metricName ).toList ();
233
285
234
286
235
287
assertEquals (streamsThreadMetrics .size (), consumerPassedStreamThreadMetricNames .size ());
@@ -259,10 +311,10 @@ public void shouldPassCorrectMetricsDynamicInstances(final boolean stateUpdaterE
259
311
IntegrationTestUtils .startApplicationAndWaitUntilRunning (streamsOne );
260
312
261
313
final List <MetricName > streamsTaskMetricNames = streamsOne .metrics ().values ().stream ().map (Metric ::metricName )
262
- .filter (metricName -> metricName .tags ().containsKey ("task-id" )).collect ( Collectors . toList () );
314
+ .filter (metricName -> metricName .tags ().containsKey ("task-id" )).toList ();
263
315
264
316
final List <MetricName > consumerPassedStreamTaskMetricNames = INTERCEPTING_CONSUMERS .get (FIRST_INSTANCE_CLIENT ).passedMetrics .stream ().map (KafkaMetric ::metricName )
265
- .filter (metricName -> metricName .tags ().containsKey ("task-id" )).collect ( Collectors . toList () );
317
+ .filter (metricName -> metricName .tags ().containsKey ("task-id" )).toList ();
266
318
267
319
/*
268
320
With only one instance, Kafka Streams should register task metrics for all tasks 0_0, 0_1, 1_0, 1_1
@@ -293,24 +345,24 @@ public void shouldPassCorrectMetricsDynamicInstances(final boolean stateUpdaterE
293
345
);
294
346
295
347
final List <MetricName > streamsOneTaskMetrics = streamsOne .metrics ().values ().stream ().map (Metric ::metricName )
296
- .filter (metricName -> metricName .tags ().containsKey ("task-id" )).collect ( Collectors . toList () );
348
+ .filter (metricName -> metricName .tags ().containsKey ("task-id" )).toList ();
297
349
final List <MetricName > streamsOneStateMetrics = streamsOne .metrics ().values ().stream ().map (Metric ::metricName )
298
- .filter (metricName -> metricName .group ().equals ("stream-state-metrics" )).collect ( Collectors . toList () );
350
+ .filter (metricName -> metricName .group ().equals ("stream-state-metrics" )).toList ();
299
351
300
352
final List <MetricName > consumerOnePassedTaskMetrics = INTERCEPTING_CONSUMERS .get (FIRST_INSTANCE_CLIENT )
301
- .passedMetrics .stream ().map (KafkaMetric ::metricName ).filter (metricName -> metricName .tags ().containsKey ("task-id" )).collect ( Collectors . toList () );
353
+ .passedMetrics .stream ().map (KafkaMetric ::metricName ).filter (metricName -> metricName .tags ().containsKey ("task-id" )).toList ();
302
354
final List <MetricName > consumerOnePassedStateMetrics = INTERCEPTING_CONSUMERS .get (FIRST_INSTANCE_CLIENT )
303
- .passedMetrics .stream ().map (KafkaMetric ::metricName ).filter (metricName -> metricName .group ().equals ("stream-state-metrics" )).collect ( Collectors . toList () );
355
+ .passedMetrics .stream ().map (KafkaMetric ::metricName ).filter (metricName -> metricName .group ().equals ("stream-state-metrics" )).toList ();
304
356
305
357
final List <MetricName > streamsTwoTaskMetrics = streamsTwo .metrics ().values ().stream ().map (Metric ::metricName )
306
- .filter (metricName -> metricName .tags ().containsKey ("task-id" )).collect ( Collectors . toList () );
358
+ .filter (metricName -> metricName .tags ().containsKey ("task-id" )).toList ();
307
359
final List <MetricName > streamsTwoStateMetrics = streamsTwo .metrics ().values ().stream ().map (Metric ::metricName )
308
- .filter (metricName -> metricName .group ().equals ("stream-state-metrics" )).collect ( Collectors . toList () );
360
+ .filter (metricName -> metricName .group ().equals ("stream-state-metrics" )).toList ();
309
361
310
362
final List <MetricName > consumerTwoPassedTaskMetrics = INTERCEPTING_CONSUMERS .get (SECOND_INSTANCE_CLIENT )
311
- .passedMetrics .stream ().map (KafkaMetric ::metricName ).filter (metricName -> metricName .tags ().containsKey ("task-id" )).collect ( Collectors . toList () );
363
+ .passedMetrics .stream ().map (KafkaMetric ::metricName ).filter (metricName -> metricName .tags ().containsKey ("task-id" )).toList ();
312
364
final List <MetricName > consumerTwoPassedStateMetrics = INTERCEPTING_CONSUMERS .get (SECOND_INSTANCE_CLIENT )
313
- .passedMetrics .stream ().map (KafkaMetric ::metricName ).filter (metricName -> metricName .group ().equals ("stream-state-metrics" )).collect ( Collectors . toList () );
365
+ .passedMetrics .stream ().map (KafkaMetric ::metricName ).filter (metricName -> metricName .group ().equals ("stream-state-metrics" )).toList ();
314
366
/*
315
367
Confirm pre-existing KafkaStreams instance one only passes metrics for its tasks and has no metrics for previous tasks
316
368
*/
@@ -350,10 +402,10 @@ public void passedMetricsShouldNotLeakIntoClientMetrics() throws Exception {
350
402
IntegrationTestUtils .startApplicationAndWaitUntilRunning (streams );
351
403
352
404
final List <MetricName > streamsThreadMetrics = streams .metrics ().values ().stream ().map (Metric ::metricName )
353
- .filter (metricName -> metricName .tags ().containsKey ("thread-id" )).collect ( Collectors . toList () );
405
+ .filter (metricName -> metricName .tags ().containsKey ("thread-id" )).toList ();
354
406
355
407
final List <MetricName > streamsClientMetrics = streams .metrics ().values ().stream ().map (Metric ::metricName )
356
- .filter (metricName -> metricName .group ().equals ("stream-metrics" )).collect ( Collectors . toList () );
408
+ .filter (metricName -> metricName .group ().equals ("stream-metrics" )).toList ();
357
409
358
410
final Map <MetricName , ? extends Metric > embeddedConsumerMetrics = INTERCEPTING_CONSUMERS .get (FIRST_INSTANCE_CLIENT ).metrics ();
359
411
final Map <MetricName , ? extends Metric > embeddedAdminMetrics = INTERCEPTING_ADMIN_CLIENTS .get (FIRST_INSTANCE_CLIENT ).metrics ();
@@ -419,8 +471,41 @@ private Topology complexTopology() {
419
471
return builder .build ();
420
472
}
421
473
422
- private Topology simpleTopology () {
474
+
475
+ private void addGlobalStore (final StreamsBuilder builder ) {
476
+ builder .addGlobalStore (
477
+ Stores .keyValueStoreBuilder (
478
+ Stores .inMemoryKeyValueStore ("iq-test-store" ),
479
+ Serdes .String (),
480
+ Serdes .String ()
481
+ ),
482
+ globalStoreTopic ,
483
+ Consumed .with (Serdes .String (), Serdes .String ()),
484
+ () -> new Processor <>() {
485
+
486
+ // The store iterator is intentionally not closed here as it needs
487
+ // to be open during the test, so the Streams app will emit the
488
+ // org.apache.kafka.stream.state.oldest.iterator.open.since.ms metric
489
+ // that is expected. So the globalStoreIterator is a global variable
490
+ // (pun not intended), so it can be closed in the tearDown method.
491
+ @ SuppressWarnings ("unchecked" )
492
+ @ Override
493
+ public void init (final ProcessorContext <Void , Void > context ) {
494
+ globalStoreIterator = ((KeyValueStore <String , String >) context .getStateStore ("iq-test-store" )).all ();
495
+ }
496
+
497
+ @ Override
498
+ public void process (final Record <String , String > record ) {
499
+ // no-op
500
+ }
501
+ });
502
+ }
503
+
504
+ private Topology simpleTopology (final boolean includeGlobalStore ) {
423
505
final StreamsBuilder builder = new StreamsBuilder ();
506
+ if (includeGlobalStore ) {
507
+ addGlobalStore (builder );
508
+ }
424
509
builder .stream (inputTopicOnePartition , Consumed .with (Serdes .String (), Serdes .String ()))
425
510
.flatMapValues (value -> Arrays .asList (value .toLowerCase (Locale .getDefault ()).split ("\\ W+" )))
426
511
.to (outputTopicOnePartition , Produced .with (Serdes .String (), Serdes .String ()));
@@ -449,7 +534,7 @@ public Consumer<byte[], byte[]> getRestoreConsumer(final Map<String, Object> con
449
534
450
535
@ Override
451
536
public Consumer <byte [], byte []> getGlobalConsumer (final Map <String , Object > config ) {
452
- return new KafkaConsumer <>(config , new ByteArrayDeserializer (), new ByteArrayDeserializer ());
537
+ return new TestingMetricsInterceptingConsumer <>(config , new ByteArrayDeserializer (), new ByteArrayDeserializer ());
453
538
}
454
539
455
540
@ Override
@@ -525,7 +610,7 @@ public void exportMetrics(final AuthorizableRequestContext context, final Client
525
610
.stream ()
526
611
.flatMap (rm -> rm .getScopeMetricsList ().stream ())
527
612
.flatMap (sm -> sm .getMetricsList ().stream ())
528
- .map (metric -> metric . getGauge () )
613
+ .map (org . apache . kafka . shaded . io . opentelemetry . proto . metrics . v1 . Metric :: getGauge )
529
614
.flatMap (gauge -> gauge .getDataPointsList ().stream ())
530
615
.flatMap (numberDataPoint -> numberDataPoint .getAttributesList ().stream ())
531
616
.filter (keyValue -> keyValue .getKey ().equals ("process_id" ))
@@ -539,7 +624,7 @@ public void exportMetrics(final AuthorizableRequestContext context, final Client
539
624
.stream ()
540
625
.flatMap (rm -> rm .getScopeMetricsList ().stream ())
541
626
.flatMap (sm -> sm .getMetricsList ().stream ())
542
- .map (metric -> metric . getName () )
627
+ .map (org . apache . kafka . shaded . io . opentelemetry . proto . metrics . v1 . Metric :: getName )
543
628
.sorted ()
544
629
.collect (Collectors .toList ());
545
630
LOG .info ("Found metrics {} for clientId={}" , metricNames , clientId );
0 commit comments