@@ -63,8 +63,8 @@ public class LocalNewSessionQueue extends NewSessionQueue {
63
63
private final EventBus bus ;
64
64
private final Deque <SessionRequest > sessionRequests = new ConcurrentLinkedDeque <>();
65
65
private final ReadWriteLock lock = new ReentrantReadWriteLock (true );
66
- private final ScheduledExecutorService executorService = Executors
67
- .newSingleThreadScheduledExecutor ();
66
+ private final ScheduledExecutorService executorService =
67
+ Executors .newSingleThreadScheduledExecutor ();
68
68
private final Thread shutdownHook = new Thread (this ::callExecutorShutdown );
69
69
70
70
public LocalNewSessionQueue (Tracer tracer , EventBus bus , Duration retryInterval ,
@@ -118,7 +118,7 @@ public boolean offerLast(HttpRequest request, RequestId requestId) {
118
118
addRequestHeaders (request , requestId );
119
119
120
120
attributeMap .put (
121
- AttributeKey .REQUEST_ID .getKey (), EventAttribute .setValue (requestId .toString ()));
121
+ AttributeKey .REQUEST_ID .getKey (), EventAttribute .setValue (requestId .toString ()));
122
122
attributeMap .put ("request.added" , EventAttribute .setValue (added ));
123
123
span .addEvent ("Add new session request to the queue" , attributeMap );
124
124
@@ -147,7 +147,7 @@ public boolean offerFirst(HttpRequest request, RequestId requestId) {
147
147
writeLock .unlock ();
148
148
if (added ) {
149
149
executorService .schedule (() -> retryRequest (sessionRequest ),
150
- super .retryInterval .getSeconds (), TimeUnit .SECONDS );
150
+ super .retryInterval .getSeconds (), TimeUnit .SECONDS );
151
151
}
152
152
}
153
153
}
@@ -164,12 +164,12 @@ private void retryRequest(SessionRequest sessionRequest) {
164
164
} finally {
165
165
writeLock .unlock ();
166
166
bus .fire (new NewSessionRejectedEvent (
167
- new NewSessionErrorResponse (requestId , "New session request timed out" )));
167
+ new NewSessionErrorResponse (requestId , "New session request timed out" )));
168
168
}
169
169
} else {
170
170
LOG .log (Level .INFO ,
171
- "Adding request back to the queue. All slots are busy. Request: {0}" ,
172
- requestId );
171
+ "Adding request back to the queue. All slots are busy. Request: {0}" ,
172
+ requestId );
173
173
bus .fire (new NewSessionRequestEvent (requestId ));
174
174
}
175
175
}
@@ -182,17 +182,17 @@ public Optional<HttpRequest> remove(RequestId id) {
182
182
// Peek the deque and check if the request-id matches. Most cases, it would.
183
183
// If so poll the deque else iterate over the deque and find a match.
184
184
Optional <SessionRequest > firstSessionRequest =
185
- Optional .ofNullable (sessionRequests .peekFirst ());
185
+ Optional .ofNullable (sessionRequests .peekFirst ());
186
186
187
187
Optional <HttpRequest > httpRequest = Optional .empty ();
188
188
if (firstSessionRequest .isPresent ()) {
189
189
if (id .equals (firstSessionRequest .get ().getRequestId ())) {
190
190
httpRequest = Optional .ofNullable (sessionRequests .pollFirst ().getHttpRequest ());
191
191
} else {
192
192
Optional <SessionRequest > matchedRequest = sessionRequests
193
- .stream ()
194
- .filter (sessionRequest -> id .equals (sessionRequest .getRequestId ()))
195
- .findFirst ();
193
+ .stream ()
194
+ .filter (sessionRequest -> id .equals (sessionRequest .getRequestId ()))
195
+ .findFirst ();
196
196
197
197
if (matchedRequest .isPresent ()) {
198
198
SessionRequest sessionRequest = matchedRequest .get ();
@@ -204,7 +204,7 @@ public Optional<HttpRequest> remove(RequestId id) {
204
204
205
205
if (httpRequest .isPresent () && hasRequestTimedOut (httpRequest .get ())) {
206
206
bus .fire (new NewSessionRejectedEvent (
207
- new NewSessionErrorResponse (id , "New session request timed out" )));
207
+ new NewSessionErrorResponse (id , "New session request timed out" )));
208
208
return Optional .empty ();
209
209
}
210
210
return httpRequest ;
@@ -224,8 +224,8 @@ public int clear() {
224
224
sessionRequest = sessionRequests .poll ()) {
225
225
count ++;
226
226
NewSessionErrorResponse errorResponse =
227
- new NewSessionErrorResponse (sessionRequest .getRequestId (),
228
- "New session request cancelled." );
227
+ new NewSessionErrorResponse (sessionRequest .getRequestId (),
228
+ "New session request cancelled." );
229
229
230
230
bus .fire (new NewSessionRejectedEvent (errorResponse ));
231
231
}
0 commit comments