Skip to content

Commit 5029094

Browse files
committed
Refine Topic creation for ChannelTopic and PatternTopic.
We now expose factory methods to construct ChannelTopic and PatternTopic from the Topic interface. See #3131
1 parent 0de3af0 commit 5029094

File tree

5 files changed

+44
-23
lines changed

5 files changed

+44
-23
lines changed

Diff for: src/main/antora/modules/ROOT/pages/redis/pubsub.adoc

+1-1
Original file line numberDiff line numberDiff line change
@@ -162,7 +162,7 @@ XML::
162162
----
163163
======
164164

165-
NOTE: The listener topic can be either a channel (for example, `topic="chatroom"`) or a pattern (for example, `topic="*room"`). For channels, you should use the `ChannelTopic` class, and for patterns, use the `PatternTopic` class.
165+
NOTE: The listener topic can be either a channel (for example, `topic="chatroom"` respective `Topic.channel("chatroom")`) or a pattern (for example, `topic="*room"` respective `Topic.pattern("*room")`).
166166

167167
The preceding example uses the Redis namespace to declare the message listener container and automatically register the POJOs as listeners. The full-blown beans definition follows:
168168

Diff for: src/main/java/org/springframework/data/redis/listener/Topic.java

+24
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,37 @@
1919
* Topic for a Redis message. Acts a high-level abstraction on top of Redis low-level channels or patterns.
2020
*
2121
* @author Costin Leau
22+
* @author Mark Paluch
2223
*/
2324
public interface Topic {
2425

26+
/**
27+
* Create a new {@link ChannelTopic} for channel subscriptions.
28+
*
29+
* @param channelName {@link String name} of the Redis channel; must not be {@literal null}.
30+
* @return the {@link ChannelTopic} for the given {@code channelName}.
31+
* @since 3.5
32+
*/
33+
static ChannelTopic channel(String channelName) {
34+
return ChannelTopic.of(channelName);
35+
}
36+
37+
/**
38+
* Create a new {@link PatternTopic} for channel subscriptions based on a {@code pattern}.
39+
*
40+
* @param pattern {@link String pattern} used to match channels; must not be {@literal null} or empty.
41+
* @return the {@link PatternTopic} for the given {@code pattern}.
42+
* @since 3.5
43+
*/
44+
static PatternTopic pattern(String pattern) {
45+
return PatternTopic.of(pattern);
46+
}
47+
2548
/**
2649
* Returns the topic (as a String).
2750
*
2851
* @return the topic
2952
*/
3053
String getTopic();
54+
3155
}

Diff for: src/test/java/org/springframework/data/redis/listener/ReactiveRedisMessageListenerContainerIntegrationTests.java

+5-8
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,9 @@
2323

2424
import java.nio.ByteBuffer;
2525
import java.time.Duration;
26-
import java.util.ArrayList;
2726
import java.util.Arrays;
2827
import java.util.Collection;
2928
import java.util.Collections;
30-
import java.util.List;
31-
import java.util.Queue;
3229
import java.util.concurrent.BlockingQueue;
3330
import java.util.concurrent.CompletableFuture;
3431
import java.util.concurrent.LinkedBlockingDeque;
@@ -110,7 +107,7 @@ void shouldReceiveChannelMessages() {
110107

111108
ReactiveRedisMessageListenerContainer container = new ReactiveRedisMessageListenerContainer(connectionFactory);
112109

113-
container.receiveLater(ChannelTopic.of(CHANNEL1)) //
110+
container.receiveLater(Topic.channel(CHANNEL1)) //
114111
.doOnNext(it -> doPublish(CHANNEL1.getBytes(), MESSAGE.getBytes())) //
115112
.flatMapMany(Function.identity()) //
116113
.as(StepVerifier::create) //
@@ -153,7 +150,7 @@ public void onChannelUnsubscribed(byte[] channel, long count) {
153150
}
154151
};
155152

156-
container.receive(Collections.singletonList(ChannelTopic.of(CHANNEL1)), listener) //
153+
container.receive(Collections.singletonList(Topic.channel(CHANNEL1)), listener) //
157154
.as(StepVerifier::create) //
158155
.then(awaitSubscription(container::getActiveSubscriptions))
159156
.then(() -> doPublish(CHANNEL1.getBytes(), MESSAGE.getBytes())) //
@@ -220,7 +217,7 @@ public void onPatternUnsubscribed(byte[] pattern, long count) {
220217
}
221218
};
222219

223-
container.receive(Collections.singletonList(PatternTopic.of(PATTERN1)), listener) //
220+
container.receive(Collections.singletonList(Topic.pattern(PATTERN1)), listener) //
224221
.cast(PatternMessage.class) //
225222
.as(StepVerifier::create) //
226223
.then(awaitSubscription(container::getActiveSubscriptions))
@@ -314,10 +311,10 @@ void multipleListenShouldTrackSubscriptions() throws Exception {
314311

315312
ReactiveRedisMessageListenerContainer container = new ReactiveRedisMessageListenerContainer(connectionFactory);
316313

317-
Flux<? extends ReactiveSubscription.Message<String, String>> c1 = container.receiveLater(ChannelTopic.of(CHANNEL1))
314+
Flux<? extends ReactiveSubscription.Message<String, String>> c1 = container.receiveLater(Topic.channel(CHANNEL1))
318315
.block();
319316
Flux<? extends ReactiveSubscription.Message<String, String>> c1p1 = container
320-
.receiveLater(Arrays.asList(ChannelTopic.of(CHANNEL1), PatternTopic.of(PATTERN1)),
317+
.receiveLater(Arrays.asList(Topic.channel(CHANNEL1), PatternTopic.of(PATTERN1)),
321318
SerializationPair.fromSerializer(RedisSerializer.string()),
322319
SerializationPair.fromSerializer(RedisSerializer.string()))
323320
.block();

Diff for: src/test/java/org/springframework/data/redis/listener/ReactiveRedisMessageListenerContainerUnitTests.java

+11-11
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ void shouldSubscribeToPattern() {
7979

8080
container = createContainer();
8181

82-
container.receive(PatternTopic.of("foo*")).as(StepVerifier::create).thenAwait().thenCancel().verify();
82+
container.receive(Topic.pattern("foo*")).as(StepVerifier::create).thenAwait().thenCancel().verify();
8383

8484
verify(subscriptionMock).pSubscribe(getByteBuffer("foo*"));
8585
}
@@ -90,7 +90,7 @@ void shouldSubscribeToMultiplePatterns() {
9090
when(subscriptionMock.receive()).thenReturn(Flux.never());
9191
container = createContainer();
9292

93-
container.receive(PatternTopic.of("foo*"), PatternTopic.of("bar*")).as(StepVerifier::create).thenRequest(1)
93+
container.receive(Topic.pattern("foo*"), Topic.pattern("bar*")).as(StepVerifier::create).thenRequest(1)
9494
.thenAwait().thenCancel().verify();
9595

9696
verify(subscriptionMock).pSubscribe(getByteBuffer("foo*"), getByteBuffer("bar*"));
@@ -102,7 +102,7 @@ void shouldSubscribeToChannel() {
102102
when(subscriptionMock.receive()).thenReturn(Flux.never());
103103
container = createContainer();
104104

105-
container.receive(ChannelTopic.of("foo")).as(StepVerifier::create).thenAwait().thenCancel().verify();
105+
container.receive(Topic.channel("foo")).as(StepVerifier::create).thenAwait().thenCancel().verify();
106106

107107
verify(subscriptionMock).subscribe(getByteBuffer("foo"));
108108
}
@@ -113,7 +113,7 @@ void shouldSubscribeToMultipleChannels() {
113113
when(subscriptionMock.receive()).thenReturn(Flux.never());
114114
container = createContainer();
115115

116-
container.receive(ChannelTopic.of("foo"), ChannelTopic.of("bar")).as(StepVerifier::create).thenAwait().thenCancel()
116+
container.receive(Topic.channel("foo"), Topic.channel("bar")).as(StepVerifier::create).thenAwait().thenCancel()
117117
.verify();
118118

119119
verify(subscriptionMock).subscribe(getByteBuffer("foo"), getByteBuffer("bar"));
@@ -127,7 +127,7 @@ void shouldEmitChannelMessage() {
127127
when(subscriptionMock.receive()).thenReturn(sink.asFlux());
128128
container = createContainer();
129129

130-
Flux<Message<String, String>> messageStream = container.receive(ChannelTopic.of("foo"));
130+
Flux<Message<String, String>> messageStream = container.receive(Topic.channel("foo"));
131131

132132
messageStream.as(StepVerifier::create).then(() -> {
133133
sink.tryEmitNext(createChannelMessage("foo", "message"));
@@ -146,7 +146,7 @@ void shouldEmitPatternMessage() {
146146
when(subscriptionMock.receive()).thenReturn(sink.asFlux());
147147
container = createContainer();
148148

149-
Flux<PatternMessage<String, String, String>> messageStream = container.receive(PatternTopic.of("foo*"));
149+
Flux<PatternMessage<String, String, String>> messageStream = container.receive(Topic.pattern("foo*"));
150150

151151
messageStream.as(StepVerifier::create).then(() -> {
152152
sink.tryEmitNext(createPatternMessage("foo*", "foo", "message"));
@@ -171,7 +171,7 @@ void shouldRegisterSubscription() {
171171
when(subscriptionMock.receive()).thenReturn(sink.asFlux());
172172
container = createContainer();
173173

174-
Flux<Message<String, String>> messageStream = container.receive(ChannelTopic.of("foo*"));
174+
Flux<Message<String, String>> messageStream = container.receive(Topic.channel("foo*"));
175175

176176
Disposable subscription = messageStream.subscribe();
177177

@@ -193,7 +193,7 @@ void shouldRegisterSubscriptionMultipleSubscribers() {
193193
when(subscriptionMock.receive()).thenReturn(sink.asFlux());
194194
container = createContainer();
195195

196-
Flux<Message<String, String>> messageStream = container.receive(new ChannelTopic("foo*"));
196+
Flux<Message<String, String>> messageStream = container.receive(Topic.channel("foo*"));
197197

198198
Disposable first = messageStream.subscribe();
199199
Disposable second = messageStream.subscribe();
@@ -216,7 +216,7 @@ void shouldUnsubscribeOnCancel() {
216216
when(subscriptionMock.receive()).thenReturn(sink.asFlux());
217217
container = createContainer();
218218

219-
Flux<PatternMessage<String, String, String>> messageStream = container.receive(PatternTopic.of("foo*"));
219+
Flux<PatternMessage<String, String, String>> messageStream = container.receive(Topic.pattern("foo*"));
220220

221221
messageStream.as(StepVerifier::create).then(() -> {
222222

@@ -240,7 +240,7 @@ void shouldTerminateSubscriptionsOnShutdown() {
240240
}));
241241
container = createContainer();
242242

243-
Flux<PatternMessage<String, String, String>> messageStream = container.receive(PatternTopic.of("foo*"));
243+
Flux<PatternMessage<String, String, String>> messageStream = container.receive(Topic.pattern("foo*"));
244244

245245
messageStream.as(StepVerifier::create).then(() -> {
246246
container.destroy();
@@ -255,7 +255,7 @@ void shouldCleanupDownstream() {
255255
when(subscriptionMock.receive()).thenReturn(sink.asFlux());
256256
container = createContainer();
257257

258-
Flux<PatternMessage<String, String, String>> messageStream = container.receive(PatternTopic.of("foo*"));
258+
Flux<PatternMessage<String, String, String>> messageStream = container.receive(Topic.pattern("foo*"));
259259

260260
messageStream.as(StepVerifier::create).then(() -> {
261261
assertThat(sink.currentSubscriberCount()).isGreaterThan(0);

Diff for: src/test/kotlin/org/springframework/data/redis/core/ReactiveRedisOperationsExtensionsUnitTests.kt

+3-3
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ import org.junit.jupiter.api.Test
2626
import org.springframework.data.redis.connection.DataType
2727
import org.springframework.data.redis.connection.ReactiveSubscription
2828
import org.springframework.data.redis.core.script.RedisScript
29-
import org.springframework.data.redis.listener.ChannelTopic
29+
import org.springframework.data.redis.listener.Topic
3030
import org.springframework.data.redis.serializer.RedisElementReader
3131
import org.springframework.data.redis.serializer.RedisElementWriter
3232
import reactor.core.publisher.Flux
@@ -167,8 +167,8 @@ class ReactiveRedisOperationsExtensionsUnitTests {
167167
@Test // DATAREDIS-1033
168168
fun listenTo() {
169169

170-
val topic1 = ChannelTopic.of("foo")
171-
val topic2 = ChannelTopic.of("bar")
170+
val topic1 = Topic.channel("foo")
171+
val topic2 = Topic.channel("bar")
172172
val message = ReactiveSubscription.ChannelMessage("a", "b")
173173
val operations = mockk<ReactiveRedisOperations<String, String>>()
174174
every { operations.listenTo(any(), any()) } returns Flux.just(message)

0 commit comments

Comments
 (0)