Skip to content

Commit 802e302

Browse files
committed
Add a test to ensure concurrent access to the event bus is relatively safe
1 parent b611cb9 commit 802e302

File tree

3 files changed

+111
-59
lines changed

3 files changed

+111
-59
lines changed

java/server/src/org/openqa/selenium/events/zeromq/BoundZmqEventBus.java

-2
Original file line numberDiff line numberDiff line change
@@ -68,8 +68,6 @@ class BoundZmqEventBus implements EventBus {
6868
executor.submit(() -> ZMQ.proxy(xsub, xpub, null));
6969

7070
delegate = new UnboundZmqEventBus(context, xpubAddr.advertise, xsubAddr.advertise, secret);
71-
72-
LOG.info("Event bus ready");
7371
}
7472

7573
@Override

java/server/src/org/openqa/selenium/events/zeromq/UnboundZmqEventBus.java

+89-57
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,9 @@
4848
import java.util.concurrent.ConcurrentHashMap;
4949
import java.util.concurrent.ExecutorService;
5050
import java.util.concurrent.Executors;
51+
import java.util.concurrent.ScheduledExecutorService;
52+
import java.util.concurrent.ThreadFactory;
53+
import java.util.concurrent.TimeUnit;
5154
import java.util.concurrent.atomic.AtomicBoolean;
5255
import java.util.function.Consumer;
5356
import java.util.logging.Level;
@@ -60,7 +63,9 @@ class UnboundZmqEventBus implements EventBus {
6063
static final EventName REJECTED_EVENT = new EventName("selenium-rejected-event");
6164
private static final Logger LOG = Logger.getLogger(EventBus.class.getName());
6265
private static final Json JSON = new Json();
63-
private final ExecutorService executor;
66+
private final ScheduledExecutorService socketPollingExecutor;
67+
private final ExecutorService listenerNotificationExecutor;
68+
6469
private final Map<EventName, List<Consumer<Event>>> listeners = new ConcurrentHashMap<>();
6570
private final Queue<UUID> recentMessages = EvictingQueue.create(128);
6671
private final String encodedSecret;
@@ -76,12 +81,16 @@ class UnboundZmqEventBus implements EventBus {
7681
}
7782
this.encodedSecret = builder.toString();
7883

79-
executor = Executors.newSingleThreadExecutor(r -> {
84+
ThreadFactory threadFactory = r -> {
8085
Thread thread = new Thread(r);
8186
thread.setName("Event Bus");
8287
thread.setDaemon(true);
8388
return thread;
84-
});
89+
};
90+
this.socketPollingExecutor = Executors.newSingleThreadScheduledExecutor(threadFactory);
91+
this.listenerNotificationExecutor = Executors.newFixedThreadPool(
92+
Math.max(Runtime.getRuntime().availableProcessors() / 2, 2), // At least two threads
93+
threadFactory);
8594

8695
String connectionMessage = String.format("Connecting to %s and %s", publishConnection, subscribeConnection);
8796
LOG.info(connectionMessage);
@@ -93,6 +102,7 @@ class UnboundZmqEventBus implements EventBus {
93102
.onRetry(e -> LOG.log(Level.WARNING, String.format("Failure #%s. Retrying.", e.getAttemptCount())))
94103
.onRetriesExceeded(e -> LOG.log(Level.WARNING, "Connection aborted."));
95104

105+
// Access to the zmq socket is safe here: no threads.
96106
Failsafe.with(retryPolicy).run(
97107
() -> {
98108
sub = context.createSocket(SocketType.SUB);
@@ -113,54 +123,11 @@ class UnboundZmqEventBus implements EventBus {
113123

114124
AtomicBoolean pollingStarted = new AtomicBoolean(false);
115125

116-
executor.submit(() -> {
117-
LOG.info("Bus started");
118-
while (!Thread.currentThread().isInterrupted()) {
119-
try {
120-
poller.poll(150);
121-
pollingStarted.lazySet(true);
122-
123-
if (poller.pollin(0)) {
124-
ZMQ.Socket socket = poller.getSocket(0);
125-
126-
EventName eventName = new EventName(new String(socket.recv(ZMQ.DONTWAIT), UTF_8));
127-
Secret eventSecret = JSON.toType(new String(socket.recv(ZMQ.DONTWAIT), UTF_8), Secret.class);
128-
UUID id = UUID.fromString(new String(socket.recv(ZMQ.DONTWAIT), UTF_8));
129-
String data = new String(socket.recv(ZMQ.DONTWAIT), UTF_8);
130-
131-
Object converted = JSON.toType(data, Object.class);
132-
Event event = new Event(id, eventName, converted);
133-
134-
if (recentMessages.contains(id)) {
135-
continue;
136-
}
137-
recentMessages.add(id);
138-
139-
if (!Secret.matches(secret, eventSecret)) {
140-
LOG.severe(String.format("Received message without a valid secret. Rejecting. %s -> %s", event, data));
141-
Event rejectedEvent = new Event(REJECTED_EVENT, new ZeroMqEventBus.RejectedEvent(eventName, data));
142-
143-
listeners.getOrDefault(REJECTED_EVENT, new ArrayList<>())
144-
.forEach(listener -> listener.accept(rejectedEvent));
145-
return;
146-
}
147-
148-
List<Consumer<Event>> typeListeners = listeners.get(eventName);
149-
if (typeListeners == null) {
150-
continue;
151-
}
152-
153-
typeListeners.parallelStream().forEach(listener -> listener.accept(event));
154-
}
155-
} catch (Throwable e) {
156-
if (e.getCause() != null && e.getCause() instanceof AssertionError) {
157-
// Do nothing.
158-
} else {
159-
throw e;
160-
}
161-
}
162-
}
163-
});
126+
socketPollingExecutor.scheduleWithFixedDelay(
127+
() -> pollForIncomingEvents(poller, secret, pollingStarted),
128+
0,
129+
100,
130+
TimeUnit.MILLISECONDS);
164131

165132
// Give ourselves up to a second to connect, using The World's Worst heuristic. If we don't
166133
// manage to connect, it's not the end of the world, as the socket we're connecting to may not
@@ -173,11 +140,13 @@ class UnboundZmqEventBus implements EventBus {
173140
throw new RuntimeException(e);
174141
}
175142
}
143+
144+
LOG.info("Event bus ready");
176145
}
177146

178147
@Override
179148
public boolean isReady() {
180-
return !executor.isShutdown();
149+
return !socketPollingExecutor.isShutdown();
181150
}
182151

183152
private boolean isSubAddressIPv6(String connection) {
@@ -207,20 +176,83 @@ public void addListener(EventListener<?> listener) {
207176
public void fire(Event event) {
208177
Require.nonNull("Event to send", event);
209178

210-
pub.sendMore(event.getType().getName().getBytes(UTF_8));
211-
pub.sendMore(encodedSecret.getBytes(UTF_8));
212-
pub.sendMore(event.getId().toString().getBytes(UTF_8));
213-
pub.send(event.getRawData().getBytes(UTF_8));
179+
socketPollingExecutor.execute(() -> {
180+
pub.sendMore(event.getType().getName().getBytes(UTF_8));
181+
pub.sendMore(encodedSecret.getBytes(UTF_8));
182+
pub.sendMore(event.getId().toString().getBytes(UTF_8));
183+
pub.send(event.getRawData().getBytes(UTF_8));
184+
});
214185
}
215186

216187
@Override
217188
public void close() {
218-
executor.shutdown();
189+
socketPollingExecutor.shutdownNow();
190+
listenerNotificationExecutor.shutdownNow();
191+
219192
if (sub != null) {
220193
sub.close();
221194
}
222195
if (pub != null) {
223196
pub.close();
224197
}
225198
}
199+
200+
private void pollForIncomingEvents(ZMQ.Poller poller, Secret secret, AtomicBoolean pollingStarted) {
201+
try {
202+
int count = poller.poll(0);
203+
204+
pollingStarted.lazySet(true);
205+
206+
for (int i = 0; i < count; i++) {
207+
if (poller.pollin(i)) {
208+
ZMQ.Socket socket = poller.getSocket(i);
209+
210+
EventName eventName = new EventName(new String(socket.recv(ZMQ.DONTWAIT), UTF_8));
211+
Secret eventSecret =
212+
JSON.toType(new String(socket.recv(ZMQ.DONTWAIT), UTF_8), Secret.class);
213+
UUID id = UUID.fromString(new String(socket.recv(ZMQ.DONTWAIT), UTF_8));
214+
String data = new String(socket.recv(ZMQ.DONTWAIT), UTF_8);
215+
216+
Object converted = JSON.toType(data, Object.class);
217+
Event event = new Event(id, eventName, converted);
218+
219+
if (recentMessages.contains(id)) {
220+
return;
221+
}
222+
recentMessages.add(id);
223+
224+
if (!Secret.matches(secret, eventSecret)) {
225+
LOG.severe(
226+
String.format(
227+
"Received message without a valid secret. Rejecting. %s -> %s", event, data));
228+
Event rejectedEvent =
229+
new Event(REJECTED_EVENT, new ZeroMqEventBus.RejectedEvent(eventName, data));
230+
231+
notifyListeners(REJECTED_EVENT, rejectedEvent);
232+
233+
return;
234+
}
235+
236+
notifyListeners(eventName, event);
237+
}
238+
}
239+
} catch (Throwable e) {
240+
// if the exception escapes, then we never get scheduled again
241+
LOG.log(Level.WARNING, e, () -> "Caught and swallowed exception: " + e.getMessage());
242+
}
243+
}
244+
245+
private void notifyListeners(EventName eventName, Event event) {
246+
List<Consumer<Event>> eventListeners = listeners.getOrDefault(eventName, new ArrayList<>());
247+
eventListeners.stream().forEach(listener -> {
248+
listenerNotificationExecutor.submit(() -> {
249+
try {
250+
listener.accept(event);
251+
252+
} catch (Throwable t) {
253+
LOG.log(Level.WARNING, t, () -> "Caught exception from listener: " + listener);
254+
}
255+
});
256+
});
257+
}
226258
}

java/server/test/org/openqa/selenium/events/EventBusTest.java

+22
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@
3131

3232
import java.util.Collection;
3333
import java.util.concurrent.CountDownLatch;
34+
import java.util.concurrent.ExecutorService;
35+
import java.util.concurrent.Executors;
3436
import java.util.concurrent.atomic.AtomicInteger;
3537
import java.util.function.Supplier;
3638

@@ -103,4 +105,24 @@ public void shouldNotReceiveEventsNotMeantForTheTopic() {
103105

104106
assertThat(count.get()).isEqualTo(0);
105107
}
108+
109+
@Test
110+
public void shouldBeAbleToFireEventsInParallel() throws InterruptedException {
111+
int maxCount = 100;
112+
EventName name = new EventName("cheese");
113+
114+
CountDownLatch count = new CountDownLatch(maxCount);
115+
bus.addListener(new EventListener<>(name, Object.class, obj -> count.countDown()));
116+
117+
ExecutorService executor = Executors.newCachedThreadPool();
118+
try {
119+
for (int i = 0; i < maxCount; i++) {
120+
executor.submit(() -> bus.fire(new Event(name, "")));
121+
}
122+
123+
assertThat(count.await(20, SECONDS)).describedAs(count.toString()).isTrue();
124+
} finally {
125+
executor.shutdownNow();
126+
}
127+
}
106128
}

0 commit comments

Comments
 (0)