Skip to content

Commit b54ff65

Browse files
pujaganidiemol
andauthored
[grid] Make event bus listener for Queuer and LocalQueue thread-safe (#9161)
* [grid] Make event bus listener for Queuer and LocalQueue thread-safe * [grid] Make LocalNewSessionQueuer use single instance of GetNewSessionResponse. Co-authored-by: Diego Molina <[email protected]>
1 parent 2e7f6e7 commit b54ff65

File tree

3 files changed

+70
-67
lines changed

3 files changed

+70
-67
lines changed

java/server/src/org/openqa/selenium/grid/sessionqueue/GetNewSessionResponse.java

+42-38
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,9 @@
3434
import java.util.UUID;
3535
import java.util.concurrent.ConcurrentHashMap;
3636
import java.util.concurrent.CountDownLatch;
37+
import java.util.concurrent.locks.Lock;
38+
import java.util.concurrent.locks.ReadWriteLock;
39+
import java.util.concurrent.locks.ReentrantReadWriteLock;
3740
import java.util.logging.Level;
3841
import java.util.logging.Logger;
3942

@@ -48,59 +51,60 @@ public class GetNewSessionResponse {
4851
private final EventBus bus;
4952
private final Tracer tracer;
5053
private final NewSessionQueue sessionRequests;
51-
private final Map<RequestId, NewSessionRequest> knownRequests = new ConcurrentHashMap<>();
54+
private static final Map<RequestId, NewSessionRequest> knownRequests = new ConcurrentHashMap<>();
55+
private final ReadWriteLock lock = new ReentrantReadWriteLock(true);
5256

5357
public GetNewSessionResponse(Tracer tracer, EventBus bus,
5458
NewSessionQueue sessionRequests) {
5559
this.tracer = Require.nonNull("Tracer", tracer);
5660
this.bus = Require.nonNull("Event bus", bus);
5761
this.sessionRequests = Require.nonNull("New Session Request Queue", sessionRequests);
5862

59-
this.bus.addListener(NewSessionResponseEvent.listener(sessionResponse -> {
60-
try {
61-
this.setResponse(sessionResponse);
62-
} catch (Exception ignore) {
63-
// Ignore any exception. Do not want to block the eventbus thread.
64-
}
65-
}));
63+
this.bus.addListener(NewSessionResponseEvent.listener(this::setResponse));
6664

67-
this.bus.addListener(NewSessionRejectedEvent.listener(sessionResponse -> {
68-
try {
69-
this.setErrorResponse(sessionResponse);
70-
} catch (Exception ignore) {
71-
// Ignore any exception. Do not want to block the eventbus thread.
72-
}
73-
}));
65+
this.bus.addListener(NewSessionRejectedEvent.listener(this::setErrorResponse));
7466
}
7567

7668
private void setResponse(NewSessionResponse sessionResponse) {
77-
// Each thread will get its own CountDownLatch and it is stored in the Map using request id as the key.
78-
// EventBus thread will retrieve the same request and set it's response and unblock waiting request thread.
79-
RequestId id = sessionResponse.getRequestId();
80-
Optional<NewSessionRequest> sessionRequest = Optional.ofNullable(knownRequests.get(id));
81-
82-
if (sessionRequest.isPresent()) {
83-
NewSessionRequest request = sessionRequest.get();
84-
request.setSessionResponse(
85-
new HttpResponse().setContent(bytes(sessionResponse.getDownstreamEncodedResponse())));
86-
request.getLatch().countDown();
69+
Lock writeLock = lock.writeLock();
70+
writeLock.lock();
71+
try {
72+
// Each thread will get its own CountDownLatch and it is stored in the Map using request id as the key.
73+
// EventBus thread will retrieve the same request and set it's response and unblock waiting request thread.
74+
RequestId id = sessionResponse.getRequestId();
75+
Optional<NewSessionRequest> sessionRequest = Optional.ofNullable(knownRequests.get(id));
76+
77+
if (sessionRequest.isPresent()) {
78+
NewSessionRequest request = sessionRequest.get();
79+
request.setSessionResponse(
80+
new HttpResponse().setContent(bytes(sessionResponse.getDownstreamEncodedResponse())));
81+
request.getLatch().countDown();
82+
}
83+
} finally {
84+
writeLock.unlock();
8785
}
8886
}
8987

9088
private void setErrorResponse(NewSessionErrorResponse sessionResponse) {
91-
RequestId id = sessionResponse.getRequestId();
92-
Optional<NewSessionRequest> sessionRequest = Optional.ofNullable(knownRequests.get(id));
93-
94-
// There could be a situation where the session request in the queue is scheduled for retry.
95-
// Meanwhile the request queue is cleared.
96-
// This will fire a error response event and remove the request id from the knownRequests map.
97-
// Another error response event will be fired by the Distributor when the request is retried.
98-
// Since a response is already provided for the request, the event listener should not take any action.
99-
100-
if (sessionRequest.isPresent()) {
101-
NewSessionRequest request = sessionRequest.get();
102-
request.setSessionResponse(internalErrorResponse(sessionResponse.getMessage()));
103-
request.getLatch().countDown();
89+
Lock writeLock = lock.writeLock();
90+
writeLock.lock();
91+
try {
92+
RequestId id = sessionResponse.getRequestId();
93+
Optional<NewSessionRequest> sessionRequest = Optional.ofNullable(knownRequests.get(id));
94+
95+
// There could be a situation where the session request in the queue is scheduled for retry.
96+
// Meanwhile the request queue is cleared.
97+
// This will fire a error response event and remove the request id from the knownRequests map.
98+
// Another error response event will be fired by the Distributor when the request is retried.
99+
// Since a response is already provided for the request, the event listener should not take any action.
100+
101+
if (sessionRequest.isPresent()) {
102+
NewSessionRequest request = sessionRequest.get();
103+
request.setSessionResponse(internalErrorResponse(sessionResponse.getMessage()));
104+
request.getLatch().countDown();
105+
}
106+
} finally {
107+
writeLock.unlock();
104108
}
105109
}
106110

java/server/src/org/openqa/selenium/grid/sessionqueue/local/LocalNewSessionQueue.java

+25-27
Original file line numberDiff line numberDiff line change
@@ -138,8 +138,6 @@ public boolean offerLast(HttpRequest request, RequestId requestId) {
138138
Require.nonNull("New Session request", request);
139139

140140
Span span = tracer.getCurrentContext().createSpan("local_sessionqueue.add");
141-
boolean added = false;
142-
SessionRequest sessionRequest = new SessionRequest(requestId, request);
143141

144142
Lock writeLock = lock.writeLock();
145143
writeLock.lock();
@@ -148,63 +146,63 @@ public boolean offerLast(HttpRequest request, RequestId requestId) {
148146
attributeMap.put(AttributeKey.LOGGER_CLASS.getKey(),
149147
EventAttribute.setValue(getClass().getName()));
150148

151-
added = sessionRequests.offerLast(sessionRequest);
149+
SessionRequest sessionRequest = new SessionRequest(requestId, request);
152150
addRequestHeaders(request, requestId);
151+
boolean added = sessionRequests.offerLast(sessionRequest);
153152

154153
attributeMap.put(
155154
AttributeKey.REQUEST_ID.getKey(), EventAttribute.setValue(requestId.toString()));
156155
attributeMap.put("request.added", EventAttribute.setValue(added));
157156
span.addEvent("Add new session request to the queue", attributeMap);
158157

158+
if (added) {
159+
bus.fire(new NewSessionRequestEvent(requestId));
160+
}
161+
159162
return added;
160163
} finally {
161164
writeLock.unlock();
162165
span.close();
163-
if (added) {
164-
bus.fire(new NewSessionRequestEvent(requestId));
165-
}
166166
}
167167
}
168168

169169
@Override
170170
public boolean offerFirst(HttpRequest request, RequestId requestId) {
171171
Require.nonNull("New Session request", request);
172-
boolean added = false;
173-
SessionRequest sessionRequest = new SessionRequest(requestId, request);
174-
175172
Lock writeLock = lock.writeLock();
176173
writeLock.lock();
177174
try {
178-
added = sessionRequests.offerFirst(sessionRequest);
179-
return added;
180-
} finally {
181-
writeLock.unlock();
175+
SessionRequest sessionRequest = new SessionRequest(requestId, request);
176+
boolean added = sessionRequests.offerFirst(sessionRequest);
182177
if (added) {
183178
executorService.schedule(() -> retryRequest(sessionRequest),
184179
super.retryInterval.getSeconds(), TimeUnit.SECONDS);
185180
}
181+
return added;
182+
} finally {
183+
writeLock.unlock();
186184
}
187185
}
188186

189187
private void retryRequest(SessionRequest sessionRequest) {
190-
HttpRequest request = sessionRequest.getHttpRequest();
191-
RequestId requestId = sessionRequest.getRequestId();
192-
if (hasRequestTimedOut(request)) {
193-
LOG.log(Level.INFO, "Request {0} timed out", requestId);
194-
Lock writeLock = lock.writeLock();
195-
writeLock.lock();
196-
try {
188+
Lock writeLock = lock.writeLock();
189+
writeLock.lock();
190+
try {
191+
HttpRequest request = sessionRequest.getHttpRequest();
192+
RequestId requestId = sessionRequest.getRequestId();
193+
if (hasRequestTimedOut(request)) {
194+
LOG.log(Level.INFO, "Request {0} timed out", requestId);
197195
sessionRequests.remove(sessionRequest);
198-
} finally {
199-
writeLock.unlock();
200196
bus.fire(new NewSessionRejectedEvent(
201197
new NewSessionErrorResponse(requestId, "New session request timed out")));
198+
} else {
199+
LOG.log(Level.INFO,
200+
"Adding request back to the queue. All slots are busy. Request: {0}",
201+
requestId);
202+
bus.fire(new NewSessionRequestEvent(requestId));
202203
}
203-
} else {
204-
LOG.log(Level.INFO,
205-
"Adding request back to the queue. All slots are busy. Request: {0}",
206-
requestId);
207-
bus.fire(new NewSessionRequestEvent(requestId));
204+
} finally {
205+
writeLock.unlock();
208206
}
209207
}
210208

java/server/src/org/openqa/selenium/grid/sessionqueue/local/LocalNewSessionQueuer.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ public class LocalNewSessionQueuer extends NewSessionQueuer {
4141

4242
public final NewSessionQueue sessionRequests;
4343
private final EventBus bus;
44+
private final GetNewSessionResponse getNewSessionResponse;
4445

4546
public LocalNewSessionQueuer(
4647
Tracer tracer,
@@ -50,6 +51,8 @@ public LocalNewSessionQueuer(
5051
super(tracer, registrationSecret);
5152
this.bus = Require.nonNull("Event bus", bus);
5253
this.sessionRequests = Require.nonNull("New Session Request Queue", sessionRequests);
54+
55+
this.getNewSessionResponse = new GetNewSessionResponse(tracer, bus, sessionRequests);
5356
}
5457

5558
public static NewSessionQueuer create(Config config) {
@@ -72,8 +75,6 @@ public static NewSessionQueuer create(Config config) {
7275
@Override
7376
public HttpResponse addToQueue(HttpRequest request) {
7477
validateSessionRequest(request);
75-
GetNewSessionResponse getNewSessionResponse =
76-
new GetNewSessionResponse(tracer, bus, sessionRequests);
7778
return getNewSessionResponse.add(request);
7879
}
7980

0 commit comments

Comments
 (0)