Skip to content

Commit 875ffb3

Browse files
garfailedonobc
authored andcommitted
Respect executor set on container props
Makes sure that the ConcurrentPulsarListenerContainerFactory copies the task executor from the factory container properties to the container instance properties. Backports the fix for #1103 from 6dcc813. The change was manually applied as the concurrent unit test class diverged heavily. See #1103 Signed-off-by: Daniel Szabo <[email protected]>
1 parent 1feb404 commit 875ffb3

File tree

2 files changed

+63
-1
lines changed

2 files changed

+63
-1
lines changed

spring-pulsar/src/main/java/org/springframework/pulsar/config/ConcurrentPulsarListenerContainerFactory.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2022-2023 the original author or authors.
2+
* Copyright 2022-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -33,6 +33,7 @@
3333
* @author Soby Chacko
3434
* @author Chris Bono
3535
* @author Alexander Preuß
36+
* @author Daniel Szabo
3637
*/
3738
public class ConcurrentPulsarListenerContainerFactory<T>
3839
extends AbstractPulsarListenerContainerFactory<ConcurrentPulsarMessageListenerContainer<T>, T> {
@@ -74,6 +75,7 @@ protected ConcurrentPulsarMessageListenerContainer<T> createContainerInstance(Pu
7475
PulsarContainerProperties properties = new PulsarContainerProperties();
7576
properties.setSchemaResolver(this.getContainerProperties().getSchemaResolver());
7677
properties.setTopicResolver(this.getContainerProperties().getTopicResolver());
78+
properties.setConsumerTaskExecutor(this.getContainerProperties().getConsumerTaskExecutor());
7779

7880
var parentTxnProps = this.getContainerProperties().transactions();
7981
var childTxnProps = properties.transactions();
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
/*
2+
* Copyright 2025-2025 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.pulsar.config;
18+
19+
import static org.assertj.core.api.Assertions.assertThat;
20+
import static org.mockito.Mockito.mock;
21+
import static org.mockito.Mockito.when;
22+
23+
import org.junit.jupiter.api.Nested;
24+
import org.junit.jupiter.api.Test;
25+
26+
import org.springframework.core.task.AsyncTaskExecutor;
27+
import org.springframework.pulsar.core.PulsarConsumerFactory;
28+
import org.springframework.pulsar.listener.PulsarContainerProperties;
29+
30+
/**
31+
* Unit tests for {@link ConcurrentPulsarListenerContainerFactory}.
32+
*
33+
* @author Daniel Szabo
34+
* @author Chris Bono
35+
*/
36+
class ConcurrentPulsarListenerContainerFactoryTests {
37+
38+
@Nested
39+
class ConsumerTaskExecutorFrom {
40+
41+
@Test
42+
@SuppressWarnings("unchecked")
43+
void factoryPropsUsedWhenSpecified() {
44+
var factoryProps = new PulsarContainerProperties();
45+
AsyncTaskExecutor executor = mock();
46+
factoryProps.setConsumerTaskExecutor(executor);
47+
var containerFactory = new ConcurrentPulsarListenerContainerFactory<String>(
48+
mock(PulsarConsumerFactory.class), factoryProps);
49+
var endpoint = mock(PulsarListenerEndpoint.class);
50+
when(endpoint.getConcurrency()).thenReturn(1);
51+
52+
var container = containerFactory.createContainerInstance(endpoint);
53+
assertThat(container.getContainerProperties())
54+
.extracting(PulsarContainerProperties::getConsumerTaskExecutor)
55+
.isSameAs(executor);
56+
}
57+
58+
}
59+
60+
}

0 commit comments

Comments
 (0)