|
19 | 19 | import java.time.Duration;
|
20 | 20 | import java.util.List;
|
21 | 21 |
|
22 |
| -import org.junit.jupiter.api.Disabled; |
23 | 22 | import org.junit.jupiter.api.Test;
|
24 | 23 | import reactor.core.publisher.Flux;
|
25 | 24 | import reactor.test.StepVerifier;
|
26 | 25 |
|
| 26 | +import org.springframework.cloud.fn.consumer.redis.RedisTestContainerSupport; |
27 | 27 | import org.springframework.integration.IntegrationMessageHeaderAccessor;
|
28 | 28 | import org.springframework.integration.redis.store.RedisMessageStore;
|
29 | 29 | import org.springframework.integration.support.MessageBuilder;
|
30 | 30 | import org.springframework.messaging.Message;
|
| 31 | +import org.springframework.test.context.DynamicPropertyRegistry; |
| 32 | +import org.springframework.test.context.DynamicPropertySource; |
31 | 33 | import org.springframework.test.context.TestPropertySource;
|
32 | 34 |
|
33 | 35 | import static org.assertj.core.api.Assertions.assertThat;
|
|
36 | 38 | * @author Artem Bilan
|
37 | 39 | */
|
38 | 40 | @TestPropertySource(properties = "aggregator.message-store-type=redis")
|
39 |
| -@Disabled("Needs real Redis Server to be run") // TODO add redis test container |
40 |
| -public class RedisMessageStoreAggregatorTests extends AbstractAggregatorFunctionTests { |
| 41 | +public class RedisMessageStoreAggregatorTests extends AbstractAggregatorFunctionTests implements RedisTestContainerSupport { |
| 42 | + @DynamicPropertySource |
| 43 | + static void redisProperties(DynamicPropertyRegistry registry) { |
| 44 | + registry.add("spring.data.redis.url", RedisTestContainerSupport::getUri); |
| 45 | + } |
41 | 46 |
|
42 | 47 | @Test
|
43 | 48 | public void test() {
|
44 | 49 | Flux<Message<?>> input =
|
45 |
| - Flux.just(MessageBuilder.withPayload("2") |
46 |
| - .setHeader(IntegrationMessageHeaderAccessor.CORRELATION_ID, "my_correlation") |
47 |
| - .setHeader(IntegrationMessageHeaderAccessor.SEQUENCE_NUMBER, 2) |
48 |
| - .setHeader(IntegrationMessageHeaderAccessor.SEQUENCE_SIZE, 2) |
49 |
| - .build(), |
50 |
| - MessageBuilder.withPayload("1") |
51 |
| - .setHeader(IntegrationMessageHeaderAccessor.CORRELATION_ID, "my_correlation") |
52 |
| - .setHeader(IntegrationMessageHeaderAccessor.SEQUENCE_NUMBER, 1) |
53 |
| - .setHeader(IntegrationMessageHeaderAccessor.SEQUENCE_SIZE, 2) |
54 |
| - .build()); |
| 50 | + Flux.just(MessageBuilder.withPayload("2") |
| 51 | + .setHeader(IntegrationMessageHeaderAccessor.CORRELATION_ID, "my_correlation") |
| 52 | + .setHeader(IntegrationMessageHeaderAccessor.SEQUENCE_NUMBER, 2) |
| 53 | + .setHeader(IntegrationMessageHeaderAccessor.SEQUENCE_SIZE, 2) |
| 54 | + .build(), |
| 55 | + MessageBuilder.withPayload("1") |
| 56 | + .setHeader(IntegrationMessageHeaderAccessor.CORRELATION_ID, "my_correlation") |
| 57 | + .setHeader(IntegrationMessageHeaderAccessor.SEQUENCE_NUMBER, 1) |
| 58 | + .setHeader(IntegrationMessageHeaderAccessor.SEQUENCE_SIZE, 2) |
| 59 | + .build()); |
55 | 60 |
|
56 | 61 | Flux<Message<?>> output = this.aggregatorFunction.apply(input);
|
57 | 62 |
|
58 | 63 | output.as(StepVerifier::create)
|
59 |
| - .assertNext((message) -> |
60 |
| - assertThat(message) |
61 |
| - .extracting(Message::getPayload) |
62 |
| - .isInstanceOf(List.class) |
63 |
| - .asList() |
64 |
| - .hasSize(2) |
65 |
| - .contains("1", "2")) |
66 |
| - .thenCancel() |
67 |
| - .verify(Duration.ofSeconds(10)); |
| 64 | + .assertNext((message) -> |
| 65 | + assertThat(message) |
| 66 | + .extracting(Message::getPayload) |
| 67 | + .isInstanceOf(List.class) |
| 68 | + .asList() |
| 69 | + .hasSize(2) |
| 70 | + .contains("1", "2")) |
| 71 | + .thenCancel() |
| 72 | + .verify(Duration.ofSeconds(10)); |
68 | 73 |
|
69 | 74 | assertThat(this.messageGroupStore).isInstanceOf(RedisMessageStore.class);
|
70 | 75 |
|
|
0 commit comments