Skip to content

Commit eea905f

Browse files
authored
Merge pull request #3015 from sobychacko/gh-2985
GH-2985: Add Kafka Listener Container Customizer interfaces and docum…
2 parents cbfd3aa + f8d6caa commit eea905f

File tree

6 files changed

+372
-2
lines changed

6 files changed

+372
-2
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
/*
2+
* Copyright 2024-2024 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.cloud.stream.binder.kafka;
18+
19+
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
20+
import org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties;
21+
import org.springframework.cloud.stream.config.ListenerContainerCustomizer;
22+
import org.springframework.kafka.listener.AbstractMessageListenerContainer;
23+
24+
/**
25+
* Extension of {@link ListenerContainerCustomizer} specific to Kafka binder.
26+
* This interface allows for customization of Kafka listener containers with
27+
* access to Kafka-specific extended consumer properties.
28+
*
29+
* @author Soby Chacko
30+
* @since 4.2.0
31+
*/
32+
public interface KafkaListenerContainerCustomizer extends ListenerContainerCustomizer<AbstractMessageListenerContainer<?, ?>> {
33+
34+
/**
35+
* Configure the Kafka listener container with access to extended consumer properties.
36+
*
37+
* @param container the Kafka message listener container to configure
38+
* @param destinationName the name of the destination (topic) that this listener container is associated with
39+
* @param group the consumer group name
40+
* @param extendedConsumerProperties the extended consumer properties specific to Kafka
41+
*/
42+
default void configure(AbstractMessageListenerContainer<?, ?> container, String destinationName, String group,
43+
ExtendedConsumerProperties<KafkaConsumerProperties> extendedConsumerProperties) {
44+
configure(container, destinationName, group);
45+
}
46+
}

binders/kafka-binder/spring-cloud-stream-binder-kafka/src/main/java/org/springframework/cloud/stream/binder/kafka/KafkaMessageChannelBinder.java

+4-1
Original file line numberDiff line numberDiff line change
@@ -758,7 +758,10 @@ else if (!extendedConsumerProperties.isBatchMode() && transMan != null) {
758758
? createBackOff(extendedConsumerProperties)
759759
: null;
760760
c.configure(messageListenerContainer, destination.getName(), consumerGroup, destinationResolver,
761-
createBackOff);
761+
createBackOff, extendedConsumerProperties);
762+
}
763+
else if (customizer instanceof KafkaListenerContainerCustomizer c) {
764+
c.configure(messageListenerContainer, destination.getName(), consumerGroup, extendedConsumerProperties);
762765
}
763766
else {
764767
((ListenerContainerCustomizer<Object>) customizer)

binders/kafka-binder/spring-cloud-stream-binder-kafka/src/main/java/org/springframework/cloud/stream/binder/kafka/ListenerContainerWithDlqAndRetryCustomizer.java

+32-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2021-2021 the original author or authors.
2+
* Copyright 2021-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -21,6 +21,8 @@
2121
import org.apache.kafka.clients.consumer.ConsumerRecord;
2222
import org.apache.kafka.common.TopicPartition;
2323

24+
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
25+
import org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties;
2426
import org.springframework.cloud.stream.config.ListenerContainerCustomizer;
2527
import org.springframework.kafka.listener.AbstractMessageListenerContainer;
2628
import org.springframework.lang.Nullable;
@@ -31,12 +33,21 @@
3133
* metadata.
3234
*
3335
* @author Gary Russell
36+
* @author Soby Chacko
3437
* @since 3.2
3538
*
3639
*/
3740
public interface ListenerContainerWithDlqAndRetryCustomizer
3841
extends ListenerContainerCustomizer<AbstractMessageListenerContainer<?, ?>> {
3942

43+
/**
44+
*
45+
* API method for configuring the container that also gives access to the {@link ExtendedConsumerProperties} for the binding.
46+
*
47+
* @param container the container.
48+
* @param destinationName the destination name.
49+
* @param group the consumer group.
50+
*/
4051
@Override
4152
default void configure(AbstractMessageListenerContainer<?, ?> container, String destinationName, String group) {
4253
}
@@ -55,6 +66,26 @@ void configure(AbstractMessageListenerContainer<?, ?> container, String destinat
5566
@Nullable BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition> dlqDestinationResolver,
5667
@Nullable BackOff backOff);
5768

69+
/**
70+
*
71+
* API method for configuring the container that also gives access to the {@link ExtendedConsumerProperties} for the binding.
72+
*
73+
* @param container the container.
74+
* @param destinationName the destination name.
75+
* @param group the consumer group.
76+
* @param dlqDestinationResolver a destination resolver for the dead letter topic (if
77+
* enableDlq).
78+
* @param backOff the backOff using retry properties (if configured).
79+
* @param extendedConsumerProperties extended binding consumer properties.
80+
*
81+
* @since 4.2.0
82+
*/
83+
default void configure(AbstractMessageListenerContainer<?, ?> container, String destinationName, String group,
84+
@Nullable BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition> dlqDestinationResolver,
85+
@Nullable BackOff backOff, ExtendedConsumerProperties<KafkaConsumerProperties> extendedConsumerProperties) {
86+
configure(container, destinationName, group, dlqDestinationResolver, backOff);
87+
}
88+
5889
/**
5990
* Return false to move retries and DLQ from the binding to a customized error handler
6091
* using the retry metadata and/or a {@code DeadLetterPublishingRecoverer} when
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,162 @@
1+
/*
2+
* Copyright 2024-2024 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.cloud.stream.binder.kafka;
18+
19+
import java.util.function.Consumer;
20+
21+
import org.junit.jupiter.api.Test;
22+
import org.mockito.ArgumentCaptor;
23+
24+
import org.springframework.beans.factory.annotation.Autowired;
25+
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
26+
import org.springframework.boot.test.context.SpringBootTest;
27+
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
28+
import org.springframework.cloud.stream.binder.kafka.config.KafkaBinderConfiguration;
29+
import org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties;
30+
import org.springframework.context.annotation.Bean;
31+
import org.springframework.context.annotation.Configuration;
32+
import org.springframework.context.annotation.Primary;
33+
import org.springframework.kafka.listener.AbstractMessageListenerContainer;
34+
import org.springframework.kafka.test.context.EmbeddedKafka;
35+
import org.springframework.test.annotation.DirtiesContext;
36+
37+
38+
import static org.assertj.core.api.Assertions.assertThat;
39+
import static org.mockito.ArgumentMatchers.any;
40+
import static org.mockito.ArgumentMatchers.eq;
41+
import static org.mockito.ArgumentMatchers.isNull;
42+
import static org.mockito.Mockito.mock;
43+
import static org.mockito.Mockito.timeout;
44+
import static org.mockito.Mockito.verify;
45+
46+
/**
47+
* @author Soby Chacko
48+
* @since 4.2.0
49+
*/
50+
@SpringBootTest(
51+
classes = {KafkaBinderConfiguration.class, KafkaListenerContainerCustomizerTests.TestConfig.class},
52+
properties = {
53+
"spring.cloud.function.definition=testConsumer",
54+
"spring.cloud.stream.bindings.testConsumer-in-0.destination=test-topic",
55+
"spring.cloud.stream.bindings.testConsumer-in-0.group=test-group",
56+
"spring.cloud.stream.kafka.bindings.testConsumer-in-0.consumer.enableDlq=true"
57+
}
58+
)
59+
@DirtiesContext
60+
@EmbeddedKafka(partitions = 1, topics = "test-topic")
61+
class KafkaListenerContainerCustomizerTests {
62+
63+
@Autowired
64+
private KafkaListenerContainerCustomizer compositeCustomizer;
65+
66+
@Test
67+
@SuppressWarnings("unchecked")
68+
void customizersInvoked() {
69+
KafkaListenerContainerCustomizer kafkaCustomizer =
70+
((TestConfig.CompositeCustomizer) compositeCustomizer).getKafkaCustomizer();
71+
ListenerContainerWithDlqAndRetryCustomizer dlqCustomizer =
72+
((TestConfig.CompositeCustomizer) compositeCustomizer).getDlqCustomizer();
73+
74+
ArgumentCaptor<ExtendedConsumerProperties<KafkaConsumerProperties>> kafkaPropertiesCaptor = ArgumentCaptor.forClass(ExtendedConsumerProperties.class);
75+
ArgumentCaptor<ExtendedConsumerProperties<KafkaConsumerProperties>> dlqPropertiesCaptor = ArgumentCaptor.forClass(ExtendedConsumerProperties.class);
76+
77+
verify(kafkaCustomizer, timeout(5000).times(1)).configure(
78+
any(AbstractMessageListenerContainer.class),
79+
eq("test-topic"),
80+
eq("test-group"),
81+
kafkaPropertiesCaptor.capture()
82+
);
83+
84+
verify(dlqCustomizer, timeout(5000).times(1)).configure(
85+
any(AbstractMessageListenerContainer.class),
86+
eq("test-topic"),
87+
eq("test-group"),
88+
isNull(),
89+
isNull(),
90+
dlqPropertiesCaptor.capture()
91+
);
92+
93+
ExtendedConsumerProperties<KafkaConsumerProperties> kafkaProperties = kafkaPropertiesCaptor.getValue();
94+
ExtendedConsumerProperties<KafkaConsumerProperties> dlqProperties = dlqPropertiesCaptor.getValue();
95+
96+
// Assert common properties
97+
assertThat(kafkaProperties.getBindingName()).isEqualTo("testConsumer-in-0");
98+
assertThat(dlqProperties.getBindingName()).isEqualTo("testConsumer-in-0");
99+
100+
// Assert Kafka-specific properties
101+
assertThat(kafkaProperties.getExtension())
102+
.satisfies(extension -> {
103+
assertThat(extension.isEnableDlq()).isTrue();
104+
assertThat(extension.isAutoRebalanceEnabled()).isTrue();
105+
});
106+
107+
// Assert that both captured properties are the same instance
108+
assertThat(kafkaProperties).isSameAs(dlqProperties);
109+
}
110+
111+
@Configuration
112+
@EnableAutoConfiguration
113+
static class TestConfig {
114+
115+
@Bean
116+
public Consumer<String> testConsumer() {
117+
return message -> {
118+
// Do nothing, just to trigger consumer binding
119+
};
120+
}
121+
122+
@Bean
123+
@Primary
124+
public KafkaListenerContainerCustomizer compositeCustomizer() {
125+
return new CompositeCustomizer(
126+
mock(KafkaListenerContainerCustomizer.class),
127+
mock(ListenerContainerWithDlqAndRetryCustomizer.class)
128+
);
129+
}
130+
131+
static class CompositeCustomizer implements KafkaListenerContainerCustomizer {
132+
private final KafkaListenerContainerCustomizer kafkaCustomizer;
133+
private final ListenerContainerWithDlqAndRetryCustomizer dlqCustomizer;
134+
135+
CompositeCustomizer(KafkaListenerContainerCustomizer kafkaCustomizer,
136+
ListenerContainerWithDlqAndRetryCustomizer dlqCustomizer) {
137+
this.kafkaCustomizer = kafkaCustomizer;
138+
this.dlqCustomizer = dlqCustomizer;
139+
}
140+
141+
@Override
142+
public void configure(AbstractMessageListenerContainer<?, ?> container, String destinationName,
143+
String group, ExtendedConsumerProperties<KafkaConsumerProperties> extendedConsumerProperties) {
144+
kafkaCustomizer.configure(container, destinationName, group, extendedConsumerProperties);
145+
dlqCustomizer.configure(container, destinationName, group, null, null, extendedConsumerProperties);
146+
}
147+
148+
public KafkaListenerContainerCustomizer getKafkaCustomizer() {
149+
return kafkaCustomizer;
150+
}
151+
152+
public ListenerContainerWithDlqAndRetryCustomizer getDlqCustomizer() {
153+
return dlqCustomizer;
154+
}
155+
156+
@Override
157+
public void configure(AbstractMessageListenerContainer<?, ?> container, String destinationName, String group) {
158+
159+
}
160+
}
161+
}
162+
}

docs/modules/ROOT/nav.adoc

+1
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@
5858
**** xref:kafka/kafka-binder/tombstone.adoc[]
5959
**** xref:kafka/kafka-binder/rebalance_listener.adoc[]
6060
**** xref:kafka/kafka-binder/retry-dlq.adoc[]
61+
**** xref:kafka/kafka-binder/container-cust-kafka-binder.adoc[]
6162
**** xref:kafka/kafka-binder/cons-prod-config-cust.adoc[]
6263
**** xref:kafka/kafka-binder/admin-client-config-cust.adoc[]
6364
**** xref:kafka/kafka-binder/custom-health-ind.adoc[]

0 commit comments

Comments
 (0)