Skip to content

Commit ce0f321

Browse files
pdoliflhotari
authored andcommitted
[fix][test] Fix flaky NonPersistentTopicTest.testMsgDropStat (#24134)
(cherry picked from commit acec48c)
1 parent 46750fb commit ce0f321

File tree

1 file changed

+11
-4
lines changed

1 file changed

+11
-4
lines changed

Diff for: pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java

+11-4
Original file line numberDiff line numberDiff line change
@@ -844,14 +844,15 @@ public void testMsgDropStat() throws Exception {
844844
.messageRoutingMode(MessageRoutingMode.SinglePartition)
845845
.create();
846846
@Cleanup("shutdownNow")
847-
ExecutorService executor = Executors.newFixedThreadPool(5);
847+
ExecutorService executor = Executors.newFixedThreadPool(10);
848848
byte[] msgData = "testData".getBytes();
849849
final int totalProduceMessages = 1000;
850850
CountDownLatch latch = new CountDownLatch(1);
851851
AtomicInteger messagesSent = new AtomicInteger(0);
852852
for (int i = 0; i < totalProduceMessages; i++) {
853853
executor.submit(() -> {
854-
producer.sendAsync(msgData).handle((msgId, e) -> {
854+
try {
855+
MessageId msgId = producer.send(msgData);
855856
int count = messagesSent.incrementAndGet();
856857
// process at least 20% of messages before signalling the latch
857858
// a non-persistent message will return entryId as -1 when it has been dropped
@@ -861,8 +862,14 @@ public void testMsgDropStat() throws Exception {
861862
&& ((MessageIdImpl) msgId).getEntryId() == -1) {
862863
latch.countDown();
863864
}
864-
return null;
865-
});
865+
866+
Thread.sleep(10);
867+
} catch (PulsarClientException e) {
868+
throw new RuntimeException(e);
869+
} catch (InterruptedException e) {
870+
Thread.currentThread().interrupt();
871+
throw new RuntimeException(e);
872+
}
866873
});
867874
}
868875
assertTrue(latch.await(5, TimeUnit.SECONDS));

0 commit comments

Comments
 (0)