Skip to content

Commit 58794ad

Browse files
kaushalmahi12kiranprakash154jainankitk
authored
Add cancellation framework changes in wlm (#15651)
* cancellation related Signed-off-by: Kiran Prakash <[email protected]> * Update CHANGELOG.md Signed-off-by: Kiran Prakash <[email protected]> * add better cancellation reason Signed-off-by: Kiran Prakash <[email protected]> * Update DefaultTaskCancellationTests.java Signed-off-by: Kiran Prakash <[email protected]> * refactor Signed-off-by: Kiran Prakash <[email protected]> * refactor Signed-off-by: Kiran Prakash <[email protected]> * Update DefaultTaskCancellation.java Signed-off-by: Kiran Prakash <[email protected]> * Update DefaultTaskCancellation.java Signed-off-by: Kiran Prakash <[email protected]> * Update DefaultTaskCancellation.java Signed-off-by: Kiran Prakash <[email protected]> * Update DefaultTaskSelectionStrategy.java Signed-off-by: Kiran Prakash <[email protected]> * refactor Signed-off-by: Kiran Prakash <[email protected]> * refactor node level threshold Signed-off-by: Kiran Prakash <[email protected]> * use query group task Signed-off-by: Kaushal Kumar <[email protected]> * code clean up and refactorings Signed-off-by: Kaushal Kumar <[email protected]> * add unit tests and fix existing ones Signed-off-by: Kaushal Kumar <[email protected]> * uncomment the test case Signed-off-by: Kaushal Kumar <[email protected]> * update CHANGELOG Signed-off-by: Kaushal Kumar <[email protected]> * fix imports Signed-off-by: Kaushal Kumar <[email protected]> * refactor and add UTs for new constructs Signed-off-by: Kaushal Kumar <[email protected]> * fix javadocs Signed-off-by: Kaushal Kumar <[email protected]> * remove code clutter Signed-off-by: Kaushal Kumar <[email protected]> * change annotation version and task selection strategy Signed-off-by: Kaushal Kumar <[email protected]> * rename a util class Signed-off-by: Kaushal Kumar <[email protected]> * remove wrappers from resource type Signed-off-by: Kaushal Kumar <[email protected]> * apply spotless Signed-off-by: Kaushal Kumar <[email protected]> * address comments Signed-off-by: Kaushal Kumar <[email protected]> * add rename changes Signed-off-by: Kaushal Kumar <[email protected]> * address comments Signed-off-by: Kaushal Kumar <[email protected]> * refactor changes and logical bug fix Signed-off-by: Kaushal Kumar <[email protected]> * address comments Signed-off-by: Kaushal Kumar <[email protected]> --------- Signed-off-by: Kiran Prakash <[email protected]> Signed-off-by: Kaushal Kumar <[email protected]> Signed-off-by: Ankit Jain <[email protected]> Co-authored-by: Kiran Prakash <[email protected]> Co-authored-by: Ankit Jain <[email protected]>
1 parent 9354dd9 commit 58794ad

20 files changed

+1297
-98
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
77
### Added
88
- Adding WithFieldName interface for QueryBuilders with fieldName ([#15705](https://github.com/opensearch-project/OpenSearch/pull/15705))
99
- MultiTermQueries in keyword fields now default to `indexed` approach and gated behind cluster setting ([#15637](https://github.com/opensearch-project/OpenSearch/pull/15637))
10+
- [Workload Management] QueryGroup resource cancellation framework changes ([#15651](https://github.com/opensearch-project/OpenSearch/pull/15651))
1011

1112
### Dependencies
1213
- Bump `com.azure:azure-identity` from 1.13.0 to 1.13.2 ([#15578](https://github.com/opensearch-project/OpenSearch/pull/15578))

server/src/main/java/org/opensearch/cluster/metadata/QueryGroup.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import org.opensearch.cluster.Diff;
1313
import org.opensearch.common.UUIDs;
1414
import org.opensearch.common.annotation.ExperimentalApi;
15+
import org.opensearch.common.annotation.PublicApi;
1516
import org.opensearch.core.common.io.stream.StreamInput;
1617
import org.opensearch.core.common.io.stream.StreamOutput;
1718
import org.opensearch.core.xcontent.ToXContentObject;
@@ -41,7 +42,7 @@
4142
* "updated_at": 4513232415
4243
* }
4344
*/
44-
@ExperimentalApi
45+
@PublicApi(since = "2.18.0")
4546
public class QueryGroup extends AbstractDiffable<QueryGroup> implements ToXContentObject {
4647

4748
public static final String _ID_STRING = "_id";

server/src/main/java/org/opensearch/wlm/QueryGroupLevelResourceUsageView.java

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,6 @@
88

99
package org.opensearch.wlm;
1010

11-
import org.opensearch.tasks.Task;
12-
1311
import java.util.List;
1412
import java.util.Map;
1513

@@ -20,11 +18,11 @@
2018
*/
2119
public class QueryGroupLevelResourceUsageView {
2220
// resourceUsage holds the resource usage data for a QueryGroup at a point in time
23-
private final Map<ResourceType, Long> resourceUsage;
21+
private final Map<ResourceType, Double> resourceUsage;
2422
// activeTasks holds the list of active tasks for a QueryGroup at a point in time
25-
private final List<Task> activeTasks;
23+
private final List<QueryGroupTask> activeTasks;
2624

27-
public QueryGroupLevelResourceUsageView(Map<ResourceType, Long> resourceUsage, List<Task> activeTasks) {
25+
public QueryGroupLevelResourceUsageView(Map<ResourceType, Double> resourceUsage, List<QueryGroupTask> activeTasks) {
2826
this.resourceUsage = resourceUsage;
2927
this.activeTasks = activeTasks;
3028
}
@@ -34,7 +32,7 @@ public QueryGroupLevelResourceUsageView(Map<ResourceType, Long> resourceUsage, L
3432
*
3533
* @return The map of resource usage data
3634
*/
37-
public Map<ResourceType, Long> getResourceUsageData() {
35+
public Map<ResourceType, Double> getResourceUsageData() {
3836
return resourceUsage;
3937
}
4038

@@ -43,7 +41,7 @@ public Map<ResourceType, Long> getResourceUsageData() {
4341
*
4442
* @return The list of active tasks
4543
*/
46-
public List<Task> getActiveTasks() {
44+
public List<QueryGroupTask> getActiveTasks() {
4745
return activeTasks;
4846
}
4947
}

server/src/main/java/org/opensearch/wlm/QueryGroupTask.java

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,29 +10,33 @@
1010

1111
import org.apache.logging.log4j.LogManager;
1212
import org.apache.logging.log4j.Logger;
13+
import org.opensearch.common.annotation.PublicApi;
1314
import org.opensearch.common.unit.TimeValue;
1415
import org.opensearch.common.util.concurrent.ThreadContext;
1516
import org.opensearch.core.tasks.TaskId;
1617
import org.opensearch.tasks.CancellableTask;
1718

1819
import java.util.Map;
1920
import java.util.Optional;
21+
import java.util.function.LongSupplier;
2022
import java.util.function.Supplier;
2123

2224
import static org.opensearch.search.SearchService.NO_TIMEOUT;
2325

2426
/**
2527
* Base class to define QueryGroup tasks
2628
*/
29+
@PublicApi(since = "2.18.0")
2730
public class QueryGroupTask extends CancellableTask {
2831

2932
private static final Logger logger = LogManager.getLogger(QueryGroupTask.class);
3033
public static final String QUERY_GROUP_ID_HEADER = "queryGroupId";
3134
public static final Supplier<String> DEFAULT_QUERY_GROUP_ID_SUPPLIER = () -> "DEFAULT_QUERY_GROUP";
35+
private final LongSupplier nanoTimeSupplier;
3236
private String queryGroupId;
3337

3438
public QueryGroupTask(long id, String type, String action, String description, TaskId parentTaskId, Map<String, String> headers) {
35-
this(id, type, action, description, parentTaskId, headers, NO_TIMEOUT);
39+
this(id, type, action, description, parentTaskId, headers, NO_TIMEOUT, System::nanoTime);
3640
}
3741

3842
public QueryGroupTask(
@@ -43,8 +47,22 @@ public QueryGroupTask(
4347
TaskId parentTaskId,
4448
Map<String, String> headers,
4549
TimeValue cancelAfterTimeInterval
50+
) {
51+
this(id, type, action, description, parentTaskId, headers, cancelAfterTimeInterval, System::nanoTime);
52+
}
53+
54+
public QueryGroupTask(
55+
long id,
56+
String type,
57+
String action,
58+
String description,
59+
TaskId parentTaskId,
60+
Map<String, String> headers,
61+
TimeValue cancelAfterTimeInterval,
62+
LongSupplier nanoTimeSupplier
4663
) {
4764
super(id, type, action, description, parentTaskId, headers, cancelAfterTimeInterval);
65+
this.nanoTimeSupplier = nanoTimeSupplier;
4866
}
4967

5068
/**
@@ -69,6 +87,10 @@ public final void setQueryGroupId(final ThreadContext threadContext) {
6987
.orElse(DEFAULT_QUERY_GROUP_ID_SUPPLIER.get());
7088
}
7189

90+
public long getElapsedTime() {
91+
return nanoTimeSupplier.getAsLong() - getStartTimeNanos();
92+
}
93+
7294
@Override
7395
public boolean shouldCancelChildrenOnCancellation() {
7496
return false;

server/src/main/java/org/opensearch/wlm/ResourceType.java

Lines changed: 23 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,9 @@
1010

1111
import org.opensearch.common.annotation.PublicApi;
1212
import org.opensearch.core.common.io.stream.StreamOutput;
13-
import org.opensearch.core.tasks.resourcetracker.ResourceStats;
14-
import org.opensearch.tasks.Task;
13+
import org.opensearch.wlm.tracker.CpuUsageCalculator;
14+
import org.opensearch.wlm.tracker.MemoryUsageCalculator;
15+
import org.opensearch.wlm.tracker.ResourceUsageCalculator;
1516

1617
import java.io.IOException;
1718
import java.util.List;
@@ -24,19 +25,25 @@
2425
*/
2526
@PublicApi(since = "2.17.0")
2627
public enum ResourceType {
27-
CPU("cpu", task -> task.getTotalResourceUtilization(ResourceStats.CPU), true),
28-
MEMORY("memory", task -> task.getTotalResourceUtilization(ResourceStats.MEMORY), true);
28+
CPU("cpu", true, CpuUsageCalculator.INSTANCE, WorkloadManagementSettings::getNodeLevelCpuCancellationThreshold),
29+
MEMORY("memory", true, MemoryUsageCalculator.INSTANCE, WorkloadManagementSettings::getNodeLevelMemoryCancellationThreshold);
2930

3031
private final String name;
31-
private final Function<Task, Long> getResourceUsage;
3232
private final boolean statsEnabled;
33-
33+
private final ResourceUsageCalculator resourceUsageCalculator;
34+
private final Function<WorkloadManagementSettings, Double> nodeLevelThresholdSupplier;
3435
private static List<ResourceType> sortedValues = List.of(CPU, MEMORY);
3536

36-
ResourceType(String name, Function<Task, Long> getResourceUsage, boolean statsEnabled) {
37+
ResourceType(
38+
String name,
39+
boolean statsEnabled,
40+
ResourceUsageCalculator resourceUsageCalculator,
41+
Function<WorkloadManagementSettings, Double> nodeLevelThresholdSupplier
42+
) {
3743
this.name = name;
38-
this.getResourceUsage = getResourceUsage;
3944
this.statsEnabled = statsEnabled;
45+
this.resourceUsageCalculator = resourceUsageCalculator;
46+
this.nodeLevelThresholdSupplier = nodeLevelThresholdSupplier;
4047
}
4148

4249
/**
@@ -61,20 +68,18 @@ public String getName() {
6168
return name;
6269
}
6370

64-
/**
65-
* Gets the resource usage for a given resource type and task.
66-
*
67-
* @param task the task for which to calculate resource usage
68-
* @return the resource usage
69-
*/
70-
public long getResourceUsage(Task task) {
71-
return getResourceUsage.apply(task);
72-
}
73-
7471
public boolean hasStatsEnabled() {
7572
return statsEnabled;
7673
}
7774

75+
public ResourceUsageCalculator getResourceUsageCalculator() {
76+
return resourceUsageCalculator;
77+
}
78+
79+
public double getNodeLevelThreshold(WorkloadManagementSettings settings) {
80+
return nodeLevelThresholdSupplier.apply(settings);
81+
}
82+
7883
public static List<ResourceType> getSortedValues() {
7984
return sortedValues;
8085
}

server/src/main/java/org/opensearch/wlm/WorkloadManagementSettings.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,13 +8,15 @@
88

99
package org.opensearch.wlm;
1010

11+
import org.opensearch.common.annotation.PublicApi;
1112
import org.opensearch.common.settings.ClusterSettings;
1213
import org.opensearch.common.settings.Setting;
1314
import org.opensearch.common.settings.Settings;
1415

1516
/**
1617
* Main class to declare Workload Management related settings
1718
*/
19+
@PublicApi(since = "2.18.0")
1820
public class WorkloadManagementSettings {
1921
private static final Double DEFAULT_NODE_LEVEL_MEMORY_REJECTION_THRESHOLD = 0.8;
2022
private static final Double DEFAULT_NODE_LEVEL_MEMORY_CANCELLATION_THRESHOLD = 0.9;
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.wlm.cancellation;
10+
11+
import org.opensearch.wlm.QueryGroupTask;
12+
import org.opensearch.wlm.ResourceType;
13+
14+
import java.util.ArrayList;
15+
import java.util.Collections;
16+
import java.util.Comparator;
17+
import java.util.List;
18+
import java.util.stream.Collectors;
19+
20+
import static org.opensearch.wlm.cancellation.QueryGroupTaskCancellationService.MIN_VALUE;
21+
22+
/**
23+
* Represents the highest resource consuming task first selection strategy.
24+
*/
25+
public class MaximumResourceTaskSelectionStrategy implements TaskSelectionStrategy {
26+
27+
public MaximumResourceTaskSelectionStrategy() {}
28+
29+
/**
30+
* Returns a comparator that defines the sorting condition for tasks.
31+
* This is the default implementation since the most resource consuming tasks are the likely to regress the performance.
32+
* from resiliency point of view it makes sense to cancel them first
33+
*
34+
* @return The comparator
35+
*/
36+
private Comparator<QueryGroupTask> sortingCondition(ResourceType resourceType) {
37+
return Comparator.comparingDouble(task -> resourceType.getResourceUsageCalculator().calculateTaskResourceUsage(task));
38+
}
39+
40+
/**
41+
* Selects tasks for cancellation based on the provided limit and resource type.
42+
* The tasks are sorted based on the sorting condition and then selected until the accumulated resource usage reaches the limit.
43+
*
44+
* @param tasks The list of tasks from which to select
45+
* @param limit The limit on the accumulated resource usage
46+
* @param resourceType The type of resource to consider
47+
* @return The list of selected tasks
48+
* @throws IllegalArgumentException If the limit is less than zero
49+
*/
50+
public List<QueryGroupTask> selectTasksForCancellation(List<QueryGroupTask> tasks, double limit, ResourceType resourceType) {
51+
if (limit < 0) {
52+
throw new IllegalArgumentException("limit has to be greater than zero");
53+
}
54+
if (limit < MIN_VALUE) {
55+
return Collections.emptyList();
56+
}
57+
58+
List<QueryGroupTask> sortedTasks = tasks.stream().sorted(sortingCondition(resourceType).reversed()).collect(Collectors.toList());
59+
60+
List<QueryGroupTask> selectedTasks = new ArrayList<>();
61+
double accumulated = 0;
62+
for (QueryGroupTask task : sortedTasks) {
63+
selectedTasks.add(task);
64+
accumulated += resourceType.getResourceUsageCalculator().calculateTaskResourceUsage(task);
65+
if ((accumulated - limit) > MIN_VALUE) {
66+
break;
67+
}
68+
}
69+
return selectedTasks;
70+
}
71+
}

0 commit comments

Comments
 (0)