Skip to content

Commit 16b3555

Browse files
pujaganiAutomatedTesterdiemol
authored
[grid] Send node heartbeat only on successful node registration (#9213)
* [grid] Send node heartbeat only on successful node registration * [grid] Update node registration test to wait for heartbeat event Co-authored-by: David Burns <[email protected]> Co-authored-by: Diego Molina <[email protected]>
1 parent 9c41b0b commit 16b3555

File tree

3 files changed

+62
-2
lines changed

3 files changed

+62
-2
lines changed

java/server/src/org/openqa/selenium/grid/node/httpd/NodeServer.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -197,7 +197,7 @@ public NettyServer start() {
197197
Executors.newSingleThreadExecutor().submit(() -> {
198198
Failsafe.with(registrationPolicy).run(
199199
() -> {
200-
LOG.fine("Sending registration event");
200+
LOG.info("Sending registration event");
201201
HealthCheck.Result check = node.getHealthCheck().check();
202202
if (DOWN.equals(check.getAvailability())) {
203203
LOG.severe("Node is not alive: " + check.getMessage());

java/server/src/org/openqa/selenium/grid/node/local/LocalNode.java

+8-1
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import org.openqa.selenium.grid.data.Availability;
3737
import org.openqa.selenium.grid.data.CreateSessionRequest;
3838
import org.openqa.selenium.grid.data.CreateSessionResponse;
39+
import org.openqa.selenium.grid.data.NodeAddedEvent;
3940
import org.openqa.selenium.grid.data.NodeDrainComplete;
4041
import org.openqa.selenium.grid.data.NodeDrainStarted;
4142
import org.openqa.selenium.grid.data.NodeHeartBeatEvent;
@@ -171,7 +172,13 @@ private LocalNode(
171172
this.regularly = new Regularly("Local Node: " + externalUri);
172173
regularly.submit(currentSessions::cleanUp, Duration.ofSeconds(30), Duration.ofSeconds(30));
173174
regularly.submit(tempFileSystems::cleanUp, Duration.ofSeconds(30), Duration.ofSeconds(30));
174-
regularly.submit(() -> bus.fire(new NodeHeartBeatEvent(getId())), heartbeatPeriod, heartbeatPeriod);
175+
176+
bus.addListener(NodeAddedEvent.listener(nodeId -> {
177+
if (getId().equals(nodeId)) {
178+
regularly.submit(() ->
179+
bus.fire(new NodeHeartBeatEvent(getId())), heartbeatPeriod, heartbeatPeriod);
180+
}
181+
}));
175182

176183
bus.addListener(SessionClosedEvent.listener(id -> {
177184
try {

java/server/test/org/openqa/selenium/grid/distributor/DistributorTest.java

+53
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import com.google.common.collect.ImmutableMap;
2121
import com.google.common.collect.ImmutableSet;
2222
import org.assertj.core.api.AbstractAssert;
23+
import org.junit.Assert;
2324
import org.junit.Before;
2425
import org.junit.Ignore;
2526
import org.junit.Test;
@@ -35,6 +36,7 @@
3536
import org.openqa.selenium.grid.data.CreateSessionRequest;
3637
import org.openqa.selenium.grid.data.CreateSessionResponse;
3738
import org.openqa.selenium.grid.data.DistributorStatus;
39+
import org.openqa.selenium.grid.data.NodeHeartBeatEvent;
3840
import org.openqa.selenium.grid.data.NodeRemovedEvent;
3941
import org.openqa.selenium.grid.data.NodeStatus;
4042
import org.openqa.selenium.grid.data.Session;
@@ -81,6 +83,7 @@
8183
import java.util.UUID;
8284
import java.util.concurrent.CountDownLatch;
8385
import java.util.concurrent.TimeUnit;
86+
import java.util.concurrent.atomic.AtomicBoolean;
8487
import java.util.concurrent.atomic.AtomicReference;
8588
import java.util.logging.Logger;
8689
import java.util.stream.Collectors;
@@ -148,6 +151,56 @@ public void creatingANewSessionWithoutANodeEndsInFailure() {
148151
}
149152
}
150153

154+
@Test
155+
public void shouldStartHeartBeatOnNodeRegistration() {
156+
EventBus bus = new GuavaEventBus();
157+
LocalSessionMap sessions = new LocalSessionMap(tracer, bus);
158+
LocalNewSessionQueue localNewSessionQueue = new LocalNewSessionQueue(
159+
tracer,
160+
bus,
161+
Duration.ofSeconds(2),
162+
Duration.ofSeconds(2));
163+
LocalNewSessionQueuer queuer = new LocalNewSessionQueuer(
164+
tracer,
165+
bus,
166+
localNewSessionQueue,
167+
registrationSecret);
168+
LocalNode node = LocalNode.builder(tracer, bus, routableUri, routableUri, registrationSecret)
169+
.add(
170+
caps,
171+
new TestSessionFactory((id, c) -> new Session(id, nodeUri, stereotype, c, Instant.now())))
172+
.build();
173+
174+
Distributor distributor = new LocalDistributor(
175+
tracer,
176+
bus,
177+
new PassthroughHttpClient.Factory(node),
178+
sessions,
179+
queuer,
180+
registrationSecret);
181+
distributor.add(node);
182+
183+
AtomicBoolean heartbeatStarted = new AtomicBoolean();
184+
CountDownLatch latch = new CountDownLatch(1);
185+
186+
bus.addListener(NodeHeartBeatEvent.listener(nodeId -> {
187+
latch.countDown();
188+
if (node.getId().equals(nodeId)) {
189+
heartbeatStarted.set(true);
190+
}
191+
}));
192+
waitToHaveCapacity(distributor);
193+
boolean eventFired = false;
194+
try {
195+
eventFired = latch.await(30, TimeUnit.SECONDS);
196+
} catch (InterruptedException e) {
197+
Assert.fail("Thread Interrupted");
198+
}
199+
200+
assertThat(eventFired).isTrue();
201+
assertThat(heartbeatStarted).isTrue();
202+
}
203+
151204
@Test
152205
public void shouldBeAbleToAddANodeAndCreateASession() {
153206
LocalSessionMap sessions = new LocalSessionMap(tracer, bus);

0 commit comments

Comments
 (0)