31
31
import org .openqa .selenium .grid .data .CreateSessionRequest ;
32
32
import org .openqa .selenium .grid .data .CreateSessionResponse ;
33
33
import org .openqa .selenium .grid .data .DistributorStatus ;
34
- import org .openqa .selenium .grid .data .NewSessionRequestEvent ;
35
34
import org .openqa .selenium .grid .data .NodeAddedEvent ;
36
35
import org .openqa .selenium .grid .data .NodeDrainComplete ;
37
36
import org .openqa .selenium .grid .data .NodeHeartBeatEvent ;
72
71
import org .openqa .selenium .remote .tracing .Tracer ;
73
72
import org .openqa .selenium .status .HasReadyState ;
74
73
74
+ import java .io .Closeable ;
75
75
import java .io .UncheckedIOException ;
76
76
import java .time .Duration ;
77
77
import java .util .ArrayList ;
101
101
import static org .openqa .selenium .remote .tracing .AttributeKey .SESSION_URI ;
102
102
import static org .openqa .selenium .remote .tracing .Tags .EXCEPTION ;
103
103
104
- public class LocalDistributor extends Distributor {
104
+ public class LocalDistributor extends Distributor implements Closeable {
105
105
106
106
private static final Logger LOG = Logger .getLogger (LocalDistributor .class .getName ());
107
107
@@ -112,6 +112,7 @@ public class LocalDistributor extends Distributor {
112
112
private final SlotSelector slotSelector ;
113
113
private final Secret registrationSecret ;
114
114
private final Regularly hostChecker = new Regularly ("distributor host checker" );
115
+ private final Regularly purgeDeadNodes = new Regularly ("Purge deadNodes" );
115
116
private final Map <NodeId , Runnable > allChecks = new HashMap <>();
116
117
private final Duration healthcheckInterval ;
117
118
@@ -120,7 +121,7 @@ public class LocalDistributor extends Distributor {
120
121
private final Map <NodeId , Node > nodes ;
121
122
122
123
private final NewSessionQueue sessionQueue ;
123
- private final Regularly regularly ;
124
+ private final Regularly createNewSession ;
124
125
125
126
private final boolean rejectUnsupportedCaps ;
126
127
@@ -158,7 +159,7 @@ public LocalDistributor(
158
159
}
159
160
}));
160
161
161
- regularly = new Regularly (
162
+ createNewSession = new Regularly (
162
163
Executors .newSingleThreadScheduledExecutor (
163
164
r -> {
164
165
Thread thread = new Thread (r );
@@ -169,10 +170,9 @@ public LocalDistributor(
169
170
170
171
NewSessionRunnable newSessionRunnable = new NewSessionRunnable ();
171
172
bus .addListener (NodeDrainComplete .listener (this ::remove ));
172
- bus .addListener (NewSessionRequestEvent .listener (ignored -> newSessionRunnable .run ()));
173
173
174
- regularly .submit (model ::purgeDeadNodes , Duration .ofSeconds (30 ), Duration .ofSeconds (30 ));
175
- regularly .submit (newSessionRunnable , Duration .ofSeconds (5 ), Duration .ofSeconds (5 ));
174
+ purgeDeadNodes .submit (model ::purgeDeadNodes , Duration .ofSeconds (30 ), Duration .ofSeconds (30 ));
175
+ createNewSession .submit (newSessionRunnable , Duration .ofSeconds (5 ), Duration .ofSeconds (5 ));
176
176
}
177
177
178
178
public static Distributor create (Config config ) {
@@ -541,9 +541,12 @@ private boolean reserve(SlotId id) {
541
541
}
542
542
}
543
543
544
- public void callExecutorShutdown () {
544
+ @ Override
545
+ public void close () {
545
546
LOG .info ("Shutting down Distributor executor service" );
546
- regularly .shutdown ();
547
+ purgeDeadNodes .shutdown ();
548
+ hostChecker .shutdown ();
549
+ createNewSession .shutdown ();
547
550
}
548
551
549
552
public class NewSessionRunnable implements Runnable {
0 commit comments