79
79
import java .util .List ;
80
80
import java .util .Map ;
81
81
import java .util .Optional ;
82
+ import java .util .Queue ;
82
83
import java .util .Set ;
84
+ import java .util .concurrent .ConcurrentLinkedQueue ;
85
+ import java .util .concurrent .Executors ;
86
+ import java .util .concurrent .ScheduledExecutorService ;
87
+ import java .util .concurrent .TimeUnit ;
83
88
import java .util .concurrent .locks .Lock ;
84
89
import java .util .concurrent .locks .ReadWriteLock ;
85
90
import java .util .concurrent .locks .ReentrantReadWriteLock ;
@@ -98,6 +103,9 @@ public class LocalDistributor extends Distributor {
98
103
private final Secret registrationSecret ;
99
104
private final Regularly hostChecker = new Regularly ("distributor host checker" );
100
105
private final Map <NodeId , Runnable > allChecks = new HashMap <>();
106
+ private final Queue <RequestId > requestIds = new ConcurrentLinkedQueue <>();
107
+ private final ScheduledExecutorService executorService =
108
+ Executors .newSingleThreadScheduledExecutor ();
101
109
102
110
private final ReadWriteLock lock = new ReentrantReadWriteLock (/* fair */ true );
103
111
private final GridModel model ;
@@ -125,64 +133,92 @@ public LocalDistributor(
125
133
bus .addListener (NodeStatusEvent .listener (this ::register ));
126
134
bus .addListener (NodeStatusEvent .listener (model ::refresh ));
127
135
bus .addListener (NodeDrainComplete .listener (this ::remove ));
136
+ bus .addListener (NewSessionRequestEvent .listener (requestIds ::offer ));
128
137
129
- bus .addListener (NewSessionRequestEvent .listener (reqId -> {
130
- Optional <HttpRequest > sessionRequest = this .sessionRequests .remove (reqId );
131
- // Check if polling the queue did not return null
132
- if (sessionRequest .isPresent ()) {
133
- handleNewSessionRequest (sessionRequest .get (), reqId );
134
- } else {
135
- fireSessionRejectedEvent (
136
- "Unable to poll request from the new session request queue." ,
137
- reqId );
138
- }
139
- }));
138
+ Thread shutdownHook = new Thread (this ::callExecutorShutdown );
139
+ Runtime .getRuntime ().addShutdownHook (shutdownHook );
140
+ NewSessionRunnable runnable = new NewSessionRunnable ();
141
+ executorService .scheduleAtFixedRate (runnable , 0 , 1000 , TimeUnit .MILLISECONDS );
140
142
}
141
143
142
- private void handleNewSessionRequest (HttpRequest sessionRequest , RequestId reqId ) {
143
-
144
- try (Span span = newSpanAsChildOf (tracer , sessionRequest , "distributor.poll_queue" )) {
145
- Map <String , EventAttributeValue > attributeMap = new HashMap <>();
146
- attributeMap .put (
147
- AttributeKey .LOGGER_CLASS .getKey (), EventAttribute .setValue (getClass ().getName ()));
148
- span .setAttribute (AttributeKey .REQUEST_ID .getKey (), reqId .toString ());
149
- attributeMap .put (AttributeKey .REQUEST_ID .getKey (), EventAttribute .setValue (reqId .toString ()));
144
+ public class NewSessionRunnable implements Runnable {
145
+ @ Override
146
+ public void run () {
147
+ Lock writeLock = lock .writeLock ();
148
+ writeLock .lock ();
149
+ try {
150
+ if (!requestIds .isEmpty ()) {
151
+ boolean hasCapacity = nodes
152
+ .keySet ()
153
+ .stream ()
154
+ .anyMatch (key -> nodes .get (key ).getStatus ().hasCapacity ());
155
+ if (hasCapacity ) {
156
+ RequestId reqId = requestIds .poll ();
157
+ if (reqId != null ) {
158
+ Optional <HttpRequest > optionalHttpRequest = sessionRequests .remove (reqId );
159
+ // Check if polling the queue did not return null
160
+ if (optionalHttpRequest .isPresent ()) {
161
+ handleNewSessionRequest (optionalHttpRequest .get (), reqId );
162
+ } else {
163
+ fireSessionRejectedEvent (
164
+ "Unable to poll request from the new session request queue." ,
165
+ reqId );
166
+ }
167
+ }
168
+ }
169
+ }
170
+ } finally {
171
+ writeLock .unlock ();
172
+ }
173
+ }
150
174
151
- attributeMap .put ("request" , EventAttribute .setValue (sessionRequest .toString ()));
152
- Either <SessionNotCreatedException , CreateSessionResponse > response =
175
+ private void handleNewSessionRequest (HttpRequest sessionRequest , RequestId reqId ) {
176
+ try (Span span = newSpanAsChildOf (tracer , sessionRequest , "distributor.poll_queue" )) {
177
+ Map <String , EventAttributeValue > attributeMap = new HashMap <>();
178
+ attributeMap .put (
179
+ AttributeKey .LOGGER_CLASS .getKey (),
180
+ EventAttribute .setValue (getClass ().getName ()));
181
+ span .setAttribute (AttributeKey .REQUEST_ID .getKey (), reqId .toString ());
182
+ attributeMap .put (
183
+ AttributeKey .REQUEST_ID .getKey (),
184
+ EventAttribute .setValue (reqId .toString ()));
185
+
186
+ attributeMap .put ("request" , EventAttribute .setValue (sessionRequest .toString ()));
187
+ Either <SessionNotCreatedException , CreateSessionResponse > response =
153
188
newSession (sessionRequest );
154
- if (response .isRight ()) {
155
- CreateSessionResponse sessionResponse = response .right ();
156
- NewSessionResponse newSessionResponse =
189
+ if (response .isRight ()) {
190
+ CreateSessionResponse sessionResponse = response .right ();
191
+ NewSessionResponse newSessionResponse =
157
192
new NewSessionResponse (
158
- reqId ,
159
- sessionResponse .getSession (),
160
- sessionResponse .getDownstreamEncodedResponse ());
193
+ reqId ,
194
+ sessionResponse .getSession (),
195
+ sessionResponse .getDownstreamEncodedResponse ());
161
196
162
- bus .fire (new NewSessionResponseEvent (newSessionResponse ));
163
- } else {
164
- SessionNotCreatedException exception = response .left ();
197
+ bus .fire (new NewSessionResponseEvent (newSessionResponse ));
198
+ } else {
199
+ SessionNotCreatedException exception = response .left ();
165
200
166
- if (exception instanceof RetrySessionRequestException ) {
167
- boolean retried = this . sessionRequests .retryAddToQueue (sessionRequest , reqId );
201
+ if (exception instanceof RetrySessionRequestException ) {
202
+ boolean retried = sessionRequests .retryAddToQueue (sessionRequest , reqId );
168
203
169
- attributeMap .put ("request.retry_add" , EventAttribute .setValue (retried ));
170
- span .addEvent ("Retry adding to front of queue. All slots are busy ." , attributeMap );
204
+ attributeMap .put ("request.retry_add" , EventAttribute .setValue (retried ));
205
+ span .addEvent ("Retry adding to front of queue. No slot available ." , attributeMap );
171
206
172
- if (!retried ) {
173
- span .addEvent ("Retry adding to front of queue failed." , attributeMap );
207
+ if (!retried ) {
208
+ span .addEvent ("Retry adding to front of queue failed." , attributeMap );
209
+ fireSessionRejectedEvent (exception .getMessage (), reqId );
210
+ }
211
+ } else {
174
212
fireSessionRejectedEvent (exception .getMessage (), reqId );
175
213
}
176
- } else {
177
- fireSessionRejectedEvent (exception .getMessage (), reqId );
178
214
}
179
215
}
180
216
}
181
- }
182
217
183
- private void fireSessionRejectedEvent (String message , RequestId reqId ) {
184
- bus .fire (
218
+ private void fireSessionRejectedEvent (String message , RequestId reqId ) {
219
+ bus .fire (
185
220
new NewSessionRejectedEvent (new NewSessionErrorResponse (reqId , message )));
221
+ }
186
222
}
187
223
188
224
public static Distributor create (Config config ) {
@@ -393,4 +429,9 @@ protected Supplier<CreateSessionResponse> reserve(SlotId slotId, CreateSessionRe
393
429
writeLock .unlock ();
394
430
}
395
431
}
432
+
433
+ public void callExecutorShutdown () {
434
+ LOG .info ("Shutting down Distributor executor service" );
435
+ executorService .shutdownNow ();
436
+ }
396
437
}
0 commit comments