|
79 | 79 | import java.util.Optional;
|
80 | 80 | import java.util.Queue;
|
81 | 81 | import java.util.Set;
|
| 82 | +import java.util.concurrent.ConcurrentHashMap; |
82 | 83 | import java.util.concurrent.ConcurrentLinkedQueue;
|
83 | 84 | import java.util.concurrent.Executors;
|
84 | 85 | import java.util.concurrent.ScheduledExecutorService;
|
@@ -130,14 +131,20 @@ public LocalDistributor(
|
130 | 131 | this.clientFactory = Require.nonNull("HTTP client factory", clientFactory);
|
131 | 132 | this.sessions = Require.nonNull("Session map", sessions);
|
132 | 133 | this.model = new GridModel(bus);
|
133 |
| - this.nodes = new HashMap<>(); |
| 134 | + this.nodes = new ConcurrentHashMap<>(); |
134 | 135 | this.sessionRequests = Require.nonNull("New Session Request Queue", sessionRequests);
|
135 | 136 | this.registrationSecret = Require.nonNull("Registration secret", registrationSecret);
|
136 | 137 | this.healthcheckInterval = Require.nonNull("Health check interval", healthcheckInterval);
|
137 | 138 |
|
138 | 139 | bus.addListener(NodeStatusEvent.listener(this::register));
|
139 | 140 | bus.addListener(NodeStatusEvent.listener(model::refresh));
|
140 |
| - bus.addListener(NodeHeartBeatEvent.listener(model::touch)); |
| 141 | + bus.addListener(NodeHeartBeatEvent.listener(nodeStatus -> { |
| 142 | + if (nodes.containsKey(nodeStatus.getId())) { |
| 143 | + model.touch(nodeStatus.getId()); |
| 144 | + } else { |
| 145 | + register(nodeStatus); |
| 146 | + } |
| 147 | + })); |
141 | 148 | bus.addListener(NodeDrainComplete.listener(this::remove));
|
142 | 149 | bus.addListener(NewSessionRequestEvent.listener(requestIds::offer));
|
143 | 150 |
|
|
0 commit comments