Skip to content

Commit 647a7fa

Browse files
committed
Fix race in test
CorrelationData must have an id when correlating returns and confirms. With no id; there is a race to deliver the confirm and return.
1 parent c512597 commit 647a7fa

File tree

1 file changed

+8
-17
lines changed

1 file changed

+8
-17
lines changed

spring-rabbit/src/test/java/org/springframework/amqp/rabbit/core/RabbitTemplatePublisherCallbacksIntegrationTests3.java

Lines changed: 8 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -62,10 +62,10 @@ public void testRepublishOnNackThreadNoExchange() throws Exception {
6262
template.setConfirmCallback((cd, a, c) -> {
6363
confirmLatch.countDown();
6464
if (confirmLatch.getCount() == 1) {
65-
template.convertAndSend(QUEUE1, ((MyCD) cd).payload);
65+
template.convertAndSend(QUEUE1, cd.getId());
6666
}
6767
});
68-
template.convertAndSend("bad.exchange", "junk", "foo", new MyCD("foo"));
68+
template.convertAndSend("bad.exchange", "junk", "foo", new CorrelationData("foo"));
6969
assertThat(confirmLatch.await(10, TimeUnit.SECONDS)).isTrue();
7070
assertThat(template.receive(QUEUE1, 10_000)).isNotNull();
7171
}
@@ -97,7 +97,8 @@ public void testDeferredChannelCacheNack() throws Exception {
9797
channel2.close();
9898
conn.close();
9999
assertThat(TestUtils.getPropertyValue(cf, "cachedChannelsNonTransactional", List.class).size()).isEqualTo(2);
100-
template.convertAndSend("", QUEUE2 + "junk", "foo", new MyCD("foo"));
100+
CorrelationData correlationData = new CorrelationData("foo");
101+
template.convertAndSend("", QUEUE2 + "junk", "foo", correlationData);
101102
assertThat(returnLatch.await(10, TimeUnit.SECONDS)).isTrue();
102103
assertThat(confirmLatch.await(10, TimeUnit.SECONDS)).isTrue();
103104
int n = 0;
@@ -106,6 +107,7 @@ public void testDeferredChannelCacheNack() throws Exception {
106107
}
107108
assertThat(cacheCount.get()).isEqualTo(1);
108109
assertThat(returnCalledFirst.get()).isTrue();
110+
assertThat(correlationData.getReturnedMessage()).isNotNull();
109111
cf.destroy();
110112
}
111113

@@ -129,7 +131,7 @@ public void testDeferredChannelCacheAck() throws Exception {
129131
channel2.close();
130132
conn.close();
131133
assertThat(TestUtils.getPropertyValue(cf, "cachedChannelsNonTransactional", List.class).size()).isEqualTo(2);
132-
template.convertAndSend("", QUEUE2, "foo", new MyCD("foo"));
134+
template.convertAndSend("", QUEUE2, "foo", new CorrelationData("foo"));
133135
assertThat(confirmLatch.await(10, TimeUnit.SECONDS)).isTrue();
134136
assertThat(cacheCount.get()).isEqualTo(1);
135137
cf.destroy();
@@ -146,22 +148,11 @@ public void testTwoSendsAndReceivesDRTMLC() throws Exception {
146148
template.setConfirmCallback((cd, a, c) -> {
147149
confirmLatch.countDown();
148150
});
149-
template.convertSendAndReceive("", QUEUE3, "foo", new MyCD("foo"));
150-
template.convertSendAndReceive("", QUEUE3, "foo", new MyCD("foo")); // listener not registered
151+
template.convertSendAndReceive("", QUEUE3, "foo", new CorrelationData("foo"));
152+
template.convertSendAndReceive("", QUEUE3, "foo", new CorrelationData("foo")); // listener not registered
151153
assertThat(confirmLatch.await(10, TimeUnit.SECONDS)).isTrue();
152154
assertThat(template.receive(QUEUE3, 10_000)).isNotNull();
153155
assertThat(template.receive(QUEUE3, 10_000)).isNotNull();
154156
}
155157

156-
157-
private static class MyCD extends CorrelationData {
158-
159-
final String payload;
160-
161-
MyCD(String payload) {
162-
this.payload = payload;
163-
}
164-
165-
}
166-
167158
}

0 commit comments

Comments
 (0)