Skip to content

Commit 823474f

Browse files
authored
Make TE allocation strictly on cores (#418)
1 parent 8a0bfad commit 823474f

File tree

8 files changed

+107
-50
lines changed

8 files changed

+107
-50
lines changed

mantis-control-plane/mantis-control-plane-core/src/main/java/io/mantisrx/server/master/resourcecluster/ResourceCluster.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
package io.mantisrx.server.master.resourcecluster;
1818

1919
import io.mantisrx.common.Ack;
20-
import io.mantisrx.runtime.MachineDefinition;
2120
import io.mantisrx.server.core.domain.WorkerId;
2221
import io.mantisrx.server.worker.TaskExecutorGateway;
2322
import java.time.Instant;
@@ -72,7 +71,7 @@ public interface ResourceCluster extends ResourceClusterGateway {
7271
* @param workerId worker id of the task that's going to run on the node.
7372
* @return task executor assigned for the particular task.
7473
*/
75-
CompletableFuture<TaskExecutorID> getTaskExecutorFor(MachineDefinition machineDefinition, WorkerId workerId);
74+
CompletableFuture<TaskExecutorID> getTaskExecutorFor(TaskExecutorAllocationRequest allocationRequest);
7675

7776
CompletableFuture<TaskExecutorGateway> getTaskExecutorGateway(TaskExecutorID taskExecutorID);
7877

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
/*
2+
* Copyright 2023 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.mantisrx.server.master.resourcecluster;
18+
19+
import io.mantisrx.runtime.MachineDefinition;
20+
import io.mantisrx.server.core.domain.WorkerId;
21+
import lombok.AllArgsConstructor;
22+
import lombok.Value;
23+
24+
@Value
25+
@AllArgsConstructor(staticName = "of")
26+
public class TaskExecutorAllocationRequest {
27+
WorkerId workerId;
28+
MachineDefinition machineDefinition;
29+
}

mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/resourcecluster/ExecutorStateManagerImpl.java

Lines changed: 24 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -208,29 +208,32 @@ public Optional<Entry<TaskExecutorID, TaskExecutorState>> findFirst(
208208

209209
@Override
210210
public Optional<Pair<TaskExecutorID, TaskExecutorState>> findBestFit(TaskExecutorAssignmentRequest request) {
211-
for (Entry<Double, Set<TaskExecutorID>> entry :
212-
this.executorByCores.tailMap(request.getMachineDefinition().getCpuCores()).entrySet()) {
213-
214-
Optional<TaskExecutorID> executorO = entry.getValue()
215-
.stream()
216-
.filter(tid -> {
217-
if (!this.taskExecutorStateMap.containsKey(tid)) {
218-
return false;
219-
}
220-
221-
TaskExecutorState st = this.taskExecutorStateMap.get(tid);
222-
return st.isAvailable() &&
223-
st.getRegistration() != null &&
224-
st.getRegistration().getMachineDefinition().canFit(request.getMachineDefinition());
225-
})
226-
.findAny();
227-
228-
if (executorO.isPresent()) {
229-
return Optional.of(Pair.of(executorO.get(), this.taskExecutorStateMap.get(executorO.get())));
230-
}
211+
// only allow allocation in the lowest CPU cores matching group.
212+
SortedMap<Double, Set<TaskExecutorID>> targetMap =
213+
this.executorByCores.tailMap(request.getAllocationRequest().getMachineDefinition().getCpuCores());
214+
215+
if (targetMap.size() < 1) {
216+
log.warn("Cannot find any executor for request: {}", request);
217+
return Optional.empty();
231218
}
219+
Double targetCoreCount = targetMap.firstKey();
220+
log.trace("Applying assignmentReq: {} to {} cores.", request, targetCoreCount);
232221

233-
return Optional.empty();
222+
return this.executorByCores.get(targetCoreCount)
223+
.stream()
224+
.filter(tid -> {
225+
if (!this.taskExecutorStateMap.containsKey(tid)) {
226+
return false;
227+
}
228+
229+
TaskExecutorState st = this.taskExecutorStateMap.get(tid);
230+
return st.isAvailable() &&
231+
st.getRegistration() != null &&
232+
st.getRegistration().getMachineDefinition().canFit(
233+
request.getAllocationRequest().getMachineDefinition());
234+
})
235+
.findAny()
236+
.map(taskExecutorID -> Pair.of(taskExecutorID, this.taskExecutorStateMap.get(taskExecutorID)));
234237
}
235238

236239
@Override

mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/resourcecluster/ResourceClusterActor.java

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -29,14 +29,14 @@
2929
import io.mantisrx.master.resourcecluster.metrics.ResourceClusterActorMetrics;
3030
import io.mantisrx.master.resourcecluster.proto.GetClusterIdleInstancesRequest;
3131
import io.mantisrx.master.resourcecluster.proto.GetClusterIdleInstancesResponse;
32-
import io.mantisrx.runtime.MachineDefinition;
3332
import io.mantisrx.server.core.domain.WorkerId;
3433
import io.mantisrx.server.master.persistence.MantisJobStore;
3534
import io.mantisrx.server.master.resourcecluster.ClusterID;
3635
import io.mantisrx.server.master.resourcecluster.PagedActiveJobOverview;
3736
import io.mantisrx.server.master.resourcecluster.ResourceCluster.NoResourceAvailableException;
3837
import io.mantisrx.server.master.resourcecluster.ResourceCluster.ResourceOverview;
3938
import io.mantisrx.server.master.resourcecluster.ResourceCluster.TaskExecutorStatus;
39+
import io.mantisrx.server.master.resourcecluster.TaskExecutorAllocationRequest;
4040
import io.mantisrx.server.master.resourcecluster.TaskExecutorDisconnection;
4141
import io.mantisrx.server.master.resourcecluster.TaskExecutorHeartbeat;
4242
import io.mantisrx.server.master.resourcecluster.TaskExecutorID;
@@ -445,7 +445,7 @@ private void onTaskExecutorAssignmentRequest(TaskExecutorAssignmentRequest reque
445445

446446
if (matchedExecutor.isPresent()) {
447447
log.info("matched executor {} for request {}", matchedExecutor.get().getKey(), request);
448-
matchedExecutor.get().getValue().onAssignment(request.getWorkerId());
448+
matchedExecutor.get().getValue().onAssignment(request.getAllocationRequest().getWorkerId());
449449
// let's give some time for the assigned executor to be scheduled work. otherwise, the assigned executor
450450
// will be returned back to the pool.
451451
getTimers().startSingleTimer(
@@ -456,7 +456,15 @@ private void onTaskExecutorAssignmentRequest(TaskExecutorAssignmentRequest reque
456456
} else {
457457
metrics.incrementCounter(
458458
ResourceClusterActorMetrics.NO_RESOURCES_AVAILABLE,
459-
TagList.create(ImmutableMap.of("resourceCluster", clusterID.getResourceID(), "workerId", request.getWorkerId().getId(), "jobCluster", request.getWorkerId().getJobCluster(), "jobId", request.getWorkerId().getJobId())));
459+
TagList.create(ImmutableMap.of(
460+
"resourceCluster",
461+
clusterID.getResourceID(),
462+
"workerId",
463+
request.getAllocationRequest().getWorkerId().getId(),
464+
"jobCluster",
465+
request.getAllocationRequest().getWorkerId().getJobCluster(),
466+
"jobId",
467+
request.getAllocationRequest().getWorkerId().getJobId())));
460468
sender().tell(new Status.Failure(new NoResourceAvailableException(
461469
String.format("No resource available for request %s: resource overview: %s", request,
462470
getResourceOverview()))), self());
@@ -596,8 +604,7 @@ private static class HeartbeatTimeout {
596604

597605
@Value
598606
static class TaskExecutorAssignmentRequest {
599-
MachineDefinition machineDefinition;
600-
WorkerId workerId;
607+
TaskExecutorAllocationRequest allocationRequest;
601608
ClusterID clusterID;
602609
}
603610

mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/resourcecluster/ResourceClusterAkkaImpl.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -34,12 +34,12 @@
3434
import io.mantisrx.master.resourcecluster.ResourceClusterActor.TaskExecutorInfoRequest;
3535
import io.mantisrx.master.resourcecluster.ResourceClusterActor.TaskExecutorsList;
3636
import io.mantisrx.master.resourcecluster.ResourceClusterScalerActor.TriggerClusterRuleRefreshRequest;
37-
import io.mantisrx.runtime.MachineDefinition;
3837
import io.mantisrx.server.core.domain.WorkerId;
3938
import io.mantisrx.server.master.resourcecluster.ClusterID;
4039
import io.mantisrx.server.master.resourcecluster.PagedActiveJobOverview;
4140
import io.mantisrx.server.master.resourcecluster.ResourceCluster;
4241
import io.mantisrx.server.master.resourcecluster.ResourceClusterTaskExecutorMapper;
42+
import io.mantisrx.server.master.resourcecluster.TaskExecutorAllocationRequest;
4343
import io.mantisrx.server.master.resourcecluster.TaskExecutorID;
4444
import io.mantisrx.server.master.resourcecluster.TaskExecutorRegistration;
4545
import io.mantisrx.server.worker.TaskExecutorGateway;
@@ -133,11 +133,10 @@ public CompletableFuture<ResourceOverview> resourceOverview() {
133133
}
134134

135135
@Override
136-
public CompletableFuture<TaskExecutorID> getTaskExecutorFor(MachineDefinition machineDefinition,
137-
WorkerId workerId) {
136+
public CompletableFuture<TaskExecutorID> getTaskExecutorFor(TaskExecutorAllocationRequest allocationRequest) {
138137
return
139138
Patterns
140-
.ask(resourceClusterManagerActor, new TaskExecutorAssignmentRequest(machineDefinition, workerId, clusterID), askTimeout)
139+
.ask(resourceClusterManagerActor, new TaskExecutorAssignmentRequest(allocationRequest, clusterID), askTimeout)
141140
.thenApply(TaskExecutorID.class::cast)
142141
.toCompletableFuture();
143142
}

mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/server/master/scheduler/ResourceClusterAwareSchedulerActor.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import io.mantisrx.server.core.domain.WorkerId;
3030
import io.mantisrx.server.master.ExecuteStageRequestFactory;
3131
import io.mantisrx.server.master.resourcecluster.ResourceCluster;
32+
import io.mantisrx.server.master.resourcecluster.TaskExecutorAllocationRequest;
3233
import io.mantisrx.server.master.resourcecluster.TaskExecutorID;
3334
import io.mantisrx.server.master.resourcecluster.TaskExecutorRegistration;
3435
import io.mantisrx.server.worker.TaskExecutorGateway;
@@ -118,8 +119,9 @@ private void onScheduleRequestEvent(ScheduleRequestEvent event) {
118119

119120
CompletableFuture<Object> assignedFuture =
120121
resourceCluster
121-
.getTaskExecutorFor(event.request.getMachineDefinition(),
122-
event.request.getWorkerId())
122+
.getTaskExecutorFor(
123+
TaskExecutorAllocationRequest.of(
124+
event.getRequest().getWorkerId(), event.getRequest().getMachineDefinition()))
123125
.<Object>thenApply(event::onAssignment)
124126
.exceptionally(event::onFailure);
125127

mantis-control-plane/mantis-control-plane-server/src/test/java/io/mantisrx/master/resourcecluster/ExecutorStateManagerTests.java

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import io.mantisrx.server.core.TestingRpcService;
2929
import io.mantisrx.server.core.domain.WorkerId;
3030
import io.mantisrx.server.master.resourcecluster.ClusterID;
31+
import io.mantisrx.server.master.resourcecluster.TaskExecutorAllocationRequest;
3132
import io.mantisrx.server.master.resourcecluster.TaskExecutorID;
3233
import io.mantisrx.server.master.resourcecluster.TaskExecutorRegistration;
3334
import io.mantisrx.server.master.resourcecluster.TaskExecutorReport;
@@ -119,7 +120,9 @@ public void setup() {
119120
@Test
120121
public void testGetBestFit() {
121122
Optional<Pair<TaskExecutorID, TaskExecutorState>> bestFitO =
122-
stateManager.findBestFit(new TaskExecutorAssignmentRequest(MACHINE_DEFINITION_2, WORKER_ID, CLUSTER_ID));
123+
stateManager.findBestFit(new TaskExecutorAssignmentRequest(
124+
TaskExecutorAllocationRequest.of(WORKER_ID, MACHINE_DEFINITION_2),
125+
CLUSTER_ID));
123126

124127
assertFalse(bestFitO.isPresent());
125128

@@ -141,26 +144,37 @@ public void testGetBestFit() {
141144

142145
// test machine def 1
143146
bestFitO =
144-
stateManager.findBestFit(new TaskExecutorAssignmentRequest(MACHINE_DEFINITION_1, WORKER_ID, CLUSTER_ID));
147+
stateManager.findBestFit(new TaskExecutorAssignmentRequest(
148+
TaskExecutorAllocationRequest.of(WORKER_ID, MACHINE_DEFINITION_1), CLUSTER_ID));
145149
assertTrue(bestFitO.isPresent());
146150
assertEquals(TASK_EXECUTOR_ID_1, bestFitO.get().getLeft());
147151
assertEquals(state1, bestFitO.get().getRight());
148152

149153
bestFitO =
150-
stateManager.findBestFit(new TaskExecutorAssignmentRequest(MACHINE_DEFINITION_2, WORKER_ID, CLUSTER_ID));
154+
stateManager.findBestFit(new TaskExecutorAssignmentRequest(
155+
TaskExecutorAllocationRequest.of(WORKER_ID, MACHINE_DEFINITION_2), CLUSTER_ID));
151156

152157
assertTrue(bestFitO.isPresent());
153158
assertEquals(TASK_EXECUTOR_ID_2, bestFitO.get().getLeft());
154159
assertEquals(state2, bestFitO.get().getRight());
155160

161+
// disable e1 and should get nothing
162+
state1.onTaskExecutorStatusChange(new TaskExecutorStatusChange(TASK_EXECUTOR_ID_1, CLUSTER_ID,
163+
TaskExecutorReport.occupied(WORKER_ID)));
164+
bestFitO =
165+
stateManager.findBestFit(new TaskExecutorAssignmentRequest(
166+
TaskExecutorAllocationRequest.of(WORKER_ID, MACHINE_DEFINITION_1), CLUSTER_ID));
167+
assertFalse(bestFitO.isPresent());
168+
156169
// enable e3 and disable e2
157170
state3.onTaskExecutorStatusChange(new TaskExecutorStatusChange(TASK_EXECUTOR_ID_3, CLUSTER_ID,
158171
TaskExecutorReport.available()));
159172
state2.onTaskExecutorStatusChange(new TaskExecutorStatusChange(TASK_EXECUTOR_ID_2, CLUSTER_ID,
160173
TaskExecutorReport.occupied(WORKER_ID)));
161174

162175
bestFitO =
163-
stateManager.findBestFit(new TaskExecutorAssignmentRequest(MACHINE_DEFINITION_2, WORKER_ID, CLUSTER_ID));
176+
stateManager.findBestFit(new TaskExecutorAssignmentRequest(
177+
TaskExecutorAllocationRequest.of(WORKER_ID, MACHINE_DEFINITION_2), CLUSTER_ID));
164178

165179
assertTrue(bestFitO.isPresent());
166180
assertEquals(TASK_EXECUTOR_ID_3, bestFitO.get().getLeft());
@@ -169,7 +183,8 @@ public void testGetBestFit() {
169183
// test mark as unavailable
170184
stateManager.markUnavailable(TASK_EXECUTOR_ID_3);
171185
bestFitO =
172-
stateManager.findBestFit(new TaskExecutorAssignmentRequest(MACHINE_DEFINITION_2, WORKER_ID, CLUSTER_ID));
186+
stateManager.findBestFit(new TaskExecutorAssignmentRequest(
187+
TaskExecutorAllocationRequest.of(WORKER_ID, MACHINE_DEFINITION_2), CLUSTER_ID));
173188

174189
assertFalse(bestFitO.isPresent());
175190
}

mantis-control-plane/mantis-control-plane-server/src/test/java/io/mantisrx/master/resourcecluster/ResourceClusterActorTest.java

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
import io.mantisrx.server.master.resourcecluster.ResourceCluster;
4646
import io.mantisrx.server.master.resourcecluster.ResourceCluster.ResourceOverview;
4747
import io.mantisrx.server.master.resourcecluster.ResourceClusterTaskExecutorMapper;
48+
import io.mantisrx.server.master.resourcecluster.TaskExecutorAllocationRequest;
4849
import io.mantisrx.server.master.resourcecluster.TaskExecutorHeartbeat;
4950
import io.mantisrx.server.master.resourcecluster.TaskExecutorID;
5051
import io.mantisrx.server.master.resourcecluster.TaskExecutorRegistration;
@@ -120,7 +121,7 @@ public class ResourceClusterActorTest {
120121
.taskExecutorAddress(TASK_EXECUTOR_ADDRESS)
121122
.hostname(HOST_NAME)
122123
.workerPorts(WORKER_PORTS)
123-
.machineDefinition(MACHINE_DEFINITION_2)
124+
.machineDefinition(MACHINE_DEFINITION)
124125
.taskExecutorAttributes(
125126
ImmutableMap.of(
126127
WorkerConstants.WORKER_CONTAINER_DEFINITION_ID, CONTAINER_DEF_ID_2.getResourceID(),
@@ -225,7 +226,7 @@ public void testGetFreeTaskExecutors() throws Exception {
225226
TaskExecutorReport.available())).get());
226227
assertEquals(
227228
TASK_EXECUTOR_ID,
228-
resourceCluster.getTaskExecutorFor(MACHINE_DEFINITION, WORKER_ID).get());
229+
resourceCluster.getTaskExecutorFor(TaskExecutorAllocationRequest.of(WORKER_ID, MACHINE_DEFINITION)).get());
229230
assertEquals(
230231
TASK_EXECUTOR_ID,
231232
resourceCluster.getTaskExecutorAssignedFor(WORKER_ID).get());
@@ -310,7 +311,7 @@ public void testGetTaskExecutorsUsageAndList() throws Exception {
310311

311312
assertEquals(
312313
TASK_EXECUTOR_ID_3,
313-
resourceCluster.getTaskExecutorFor(MACHINE_DEFINITION_2, WORKER_ID).get());
314+
resourceCluster.getTaskExecutorFor(TaskExecutorAllocationRequest.of(WORKER_ID, MACHINE_DEFINITION_2)).get());
314315

315316
probe = new TestKit(actorSystem);
316317
resourceClusterActor.tell(new GetClusterUsageRequest(
@@ -344,7 +345,7 @@ public void testGetTaskExecutorsUsageAndList() throws Exception {
344345

345346
assertEquals(
346347
TASK_EXECUTOR_ID_2,
347-
resourceCluster.getTaskExecutorFor(MACHINE_DEFINITION_2, WORKER_ID).get());
348+
resourceCluster.getTaskExecutorFor(TaskExecutorAllocationRequest.of(WORKER_ID, MACHINE_DEFINITION)).get());
348349
probe = new TestKit(actorSystem);
349350
resourceClusterActor.tell(new GetClusterUsageRequest(
350351
CLUSTER_ID, ResourceClusterScalerActor.groupKeyFromTaskExecutorDefinitionIdFunc),
@@ -399,13 +400,13 @@ public void testAssignmentTimeout() throws Exception {
399400
TaskExecutorReport.available())).get());
400401
assertEquals(
401402
TASK_EXECUTOR_ID,
402-
resourceCluster.getTaskExecutorFor(MACHINE_DEFINITION, WORKER_ID).get());
403+
resourceCluster.getTaskExecutorFor(TaskExecutorAllocationRequest.of(WORKER_ID, MACHINE_DEFINITION)).get());
403404
assertEquals(ImmutableList.of(), resourceCluster.getAvailableTaskExecutors().get());
404405
Thread.sleep(2000);
405406
assertEquals(ImmutableList.of(TASK_EXECUTOR_ID), resourceCluster.getAvailableTaskExecutors().get());
406407
assertEquals(
407408
TASK_EXECUTOR_ID,
408-
resourceCluster.getTaskExecutorFor(MACHINE_DEFINITION, WORKER_ID).get());
409+
resourceCluster.getTaskExecutorFor(TaskExecutorAllocationRequest.of(WORKER_ID, MACHINE_DEFINITION)).get());
409410
}
410411

411412
@Test
@@ -446,9 +447,7 @@ public void testGetMultipleActiveJobs() throws ExecutionException, InterruptedEx
446447

447448
assertEquals(
448449
taskExecutorID,
449-
resourceCluster.getTaskExecutorFor(
450-
MACHINE_DEFINITION,
451-
workerId)
450+
resourceCluster.getTaskExecutorFor(TaskExecutorAllocationRequest.of(workerId, MACHINE_DEFINITION))
452451
.get());
453452
}
454453

@@ -558,7 +557,9 @@ public void testIfDisabledTaskExecutorsAreNotAvailableForScheduling() throws Exc
558557
resourceCluster.heartBeatFromTaskExecutor(
559558
new TaskExecutorHeartbeat(TASK_EXECUTOR_ID_2, CLUSTER_ID, TaskExecutorReport.available())).get());
560559
resourceCluster.disableTaskExecutorsFor(ATTRIBUTES, Instant.now().plus(Duration.ofDays(1))).get();
561-
assertEquals(TASK_EXECUTOR_ID_2, resourceCluster.getTaskExecutorFor(MACHINE_DEFINITION, WORKER_ID).get());
560+
assertEquals(
561+
TASK_EXECUTOR_ID_2,
562+
resourceCluster.getTaskExecutorFor(TaskExecutorAllocationRequest.of(WORKER_ID, MACHINE_DEFINITION)).get());
562563
}
563564

564565
@Test
@@ -589,7 +590,9 @@ public void testGetAssignedTaskExecutorAfterTaskCompletes() throws Throwable {
589590
resourceCluster.heartBeatFromTaskExecutor(
590591
new TaskExecutorHeartbeat(TASK_EXECUTOR_ID, CLUSTER_ID, TaskExecutorReport.available())).join());
591592

592-
assertEquals(TASK_EXECUTOR_ID, resourceCluster.getTaskExecutorFor(MACHINE_DEFINITION, WORKER_ID).join());
593+
assertEquals(
594+
TASK_EXECUTOR_ID,
595+
resourceCluster.getTaskExecutorFor(TaskExecutorAllocationRequest.of(WORKER_ID, MACHINE_DEFINITION)).join());
593596
assertEquals(TASK_EXECUTOR_ID, resourceCluster.getTaskExecutorAssignedFor(WORKER_ID).join());
594597
assertEquals(Ack.getInstance(), resourceCluster.notifyTaskExecutorStatusChange(
595598
new TaskExecutorStatusChange(TASK_EXECUTOR_ID, CLUSTER_ID, TaskExecutorReport.occupied(WORKER_ID))).join());

0 commit comments

Comments
 (0)