Skip to content

Commit 63937c0

Browse files
kaushalmahi12shiv0408
authored andcommitted
add taskCompletionCount in search_backpressure stats (opensearch-project#10028)
* add taskCompletionCount in search_backpressure Signed-off-by: Kaushal Kumar <[email protected]> * address comments to use final objects Signed-off-by: Kaushal Kumar <[email protected]> * do spotless check run Signed-off-by: Kaushal Kumar <[email protected]> * use primitive long to store completionCount Signed-off-by: Kaushal Kumar <[email protected]> * use primitive long to store completionCount in searchTaskStats Signed-off-by: Kaushal Kumar <[email protected]> * add pr link against the change Signed-off-by: Kaushal Kumar <[email protected]> * add taskCompletionCount in search_backpressure Signed-off-by: Kaushal Kumar <[email protected]> * address comments to use final objects Signed-off-by: Kaushal Kumar <[email protected]> * do spotless check run Signed-off-by: Kaushal Kumar <[email protected]> * use primitive long to store completionCount Signed-off-by: Kaushal Kumar <[email protected]> * rebase with upstream main Signed-off-by: Kaushal Kumar <[email protected]> * rebase with upstream main Signed-off-by: Kaushal Kumar <[email protected]> --------- Signed-off-by: Kaushal Kumar <[email protected]> Signed-off-by: Shivansh Arora <[email protected]>
1 parent 7c0e943 commit 63937c0

File tree

7 files changed

+53
-10
lines changed

7 files changed

+53
-10
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,8 +54,10 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
5454
- Change http code on create index API with bad input raising NotXContentException from 500 to 400 ([#4773](https://github.com/opensearch-project/OpenSearch/pull/4773))
5555
- Improve summary error message for invalid setting updates ([#4792](https://github.com/opensearch-project/OpenSearch/pull/4792))
5656
- Return 409 Conflict HTTP status instead of 503 on failure to concurrently execute snapshots ([#8986](https://github.com/opensearch-project/OpenSearch/pull/5855))
57+
- Add task completion count in search backpressure stats API ([#10028](https://github.com/opensearch-project/OpenSearch/pull/10028/))
5758
- Performance improvement for Datetime field caching ([#4558](https://github.com/opensearch-project/OpenSearch/issues/4558))
5859

60+
5961
### Deprecated
6062

6163
### Removed

server/src/main/java/org/opensearch/search/backpressure/SearchBackpressureService.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -399,6 +399,7 @@ public SearchBackpressureStats nodeStats() {
399399
SearchTaskStats searchTaskStats = new SearchTaskStats(
400400
searchBackpressureStates.get(SearchTask.class).getCancellationCount(),
401401
searchBackpressureStates.get(SearchTask.class).getLimitReachedCount(),
402+
searchBackpressureStates.get(SearchTask.class).getCompletionCount(),
402403
taskTrackers.get(SearchTask.class)
403404
.stream()
404405
.collect(Collectors.toUnmodifiableMap(t -> TaskResourceUsageTrackerType.fromName(t.name()), t -> t.stats(searchTasks)))
@@ -407,6 +408,7 @@ public SearchBackpressureStats nodeStats() {
407408
SearchShardTaskStats searchShardTaskStats = new SearchShardTaskStats(
408409
searchBackpressureStates.get(SearchShardTask.class).getCancellationCount(),
409410
searchBackpressureStates.get(SearchShardTask.class).getLimitReachedCount(),
411+
searchBackpressureStates.get(SearchShardTask.class).getCompletionCount(),
410412
taskTrackers.get(SearchShardTask.class)
411413
.stream()
412414
.collect(Collectors.toUnmodifiableMap(t -> TaskResourceUsageTrackerType.fromName(t.name()), t -> t.stats(searchShardTasks)))

server/src/main/java/org/opensearch/search/backpressure/stats/SearchShardTaskStats.java

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88

99
package org.opensearch.search.backpressure.stats;
1010

11+
import org.opensearch.Version;
1112
import org.opensearch.common.collect.MapBuilder;
1213
import org.opensearch.core.common.io.stream.StreamInput;
1314
import org.opensearch.core.common.io.stream.StreamOutput;
@@ -30,21 +31,29 @@
3031
public class SearchShardTaskStats implements ToXContentObject, Writeable {
3132
private final long cancellationCount;
3233
private final long limitReachedCount;
34+
private final long completionCount;
3335
private final Map<TaskResourceUsageTrackerType, TaskResourceUsageTracker.Stats> resourceUsageTrackerStats;
3436

3537
public SearchShardTaskStats(
3638
long cancellationCount,
3739
long limitReachedCount,
40+
long completionCount,
3841
Map<TaskResourceUsageTrackerType, TaskResourceUsageTracker.Stats> resourceUsageTrackerStats
3942
) {
4043
this.cancellationCount = cancellationCount;
4144
this.limitReachedCount = limitReachedCount;
45+
this.completionCount = completionCount;
4246
this.resourceUsageTrackerStats = resourceUsageTrackerStats;
4347
}
4448

4549
public SearchShardTaskStats(StreamInput in) throws IOException {
4650
this.cancellationCount = in.readVLong();
4751
this.limitReachedCount = in.readVLong();
52+
if (in.getVersion().onOrAfter(Version.V_3_0_0)) {
53+
completionCount = in.readVLong();
54+
} else {
55+
completionCount = -1;
56+
}
4857

4958
MapBuilder<TaskResourceUsageTrackerType, TaskResourceUsageTracker.Stats> builder = new MapBuilder<>();
5059
builder.put(TaskResourceUsageTrackerType.CPU_USAGE_TRACKER, in.readOptionalWriteable(CpuUsageTracker.Stats::new));
@@ -62,6 +71,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
6271
builder.field(entry.getKey().getName(), entry.getValue());
6372
}
6473
builder.endObject();
74+
if (completionCount != -1) {
75+
builder.field("completion_count", completionCount);
76+
}
6577

6678
builder.startObject("cancellation_stats")
6779
.field("cancellation_count", cancellationCount)
@@ -75,6 +87,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
7587
public void writeTo(StreamOutput out) throws IOException {
7688
out.writeVLong(cancellationCount);
7789
out.writeVLong(limitReachedCount);
90+
if (out.getVersion().onOrAfter(Version.V_3_0_0)) {
91+
out.writeVLong(completionCount);
92+
}
7893

7994
out.writeOptionalWriteable(resourceUsageTrackerStats.get(TaskResourceUsageTrackerType.CPU_USAGE_TRACKER));
8095
out.writeOptionalWriteable(resourceUsageTrackerStats.get(TaskResourceUsageTrackerType.HEAP_USAGE_TRACKER));
@@ -88,11 +103,12 @@ public boolean equals(Object o) {
88103
SearchShardTaskStats that = (SearchShardTaskStats) o;
89104
return cancellationCount == that.cancellationCount
90105
&& limitReachedCount == that.limitReachedCount
106+
&& completionCount == that.completionCount
91107
&& resourceUsageTrackerStats.equals(that.resourceUsageTrackerStats);
92108
}
93109

94110
@Override
95111
public int hashCode() {
96-
return Objects.hash(cancellationCount, limitReachedCount, resourceUsageTrackerStats);
112+
return Objects.hash(cancellationCount, limitReachedCount, resourceUsageTrackerStats, completionCount);
97113
}
98114
}

server/src/main/java/org/opensearch/search/backpressure/stats/SearchTaskStats.java

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88

99
package org.opensearch.search.backpressure.stats;
1010

11+
import org.opensearch.Version;
1112
import org.opensearch.common.collect.MapBuilder;
1213
import org.opensearch.core.common.io.stream.StreamInput;
1314
import org.opensearch.core.common.io.stream.StreamOutput;
@@ -31,21 +32,29 @@
3132
public class SearchTaskStats implements ToXContentObject, Writeable {
3233
private final long cancellationCount;
3334
private final long limitReachedCount;
35+
private final long completionCount;
3436
private final Map<TaskResourceUsageTrackerType, TaskResourceUsageTracker.Stats> resourceUsageTrackerStats;
3537

3638
public SearchTaskStats(
3739
long cancellationCount,
3840
long limitReachedCount,
41+
long completionCount,
3942
Map<TaskResourceUsageTrackerType, TaskResourceUsageTracker.Stats> resourceUsageTrackerStats
4043
) {
4144
this.cancellationCount = cancellationCount;
4245
this.limitReachedCount = limitReachedCount;
46+
this.completionCount = completionCount;
4347
this.resourceUsageTrackerStats = resourceUsageTrackerStats;
4448
}
4549

4650
public SearchTaskStats(StreamInput in) throws IOException {
4751
this.cancellationCount = in.readVLong();
4852
this.limitReachedCount = in.readVLong();
53+
if (in.getVersion().onOrAfter(Version.V_3_0_0)) {
54+
this.completionCount = in.readVLong();
55+
} else {
56+
this.completionCount = -1;
57+
}
4958

5059
MapBuilder<TaskResourceUsageTrackerType, TaskResourceUsageTracker.Stats> builder = new MapBuilder<>();
5160
builder.put(TaskResourceUsageTrackerType.CPU_USAGE_TRACKER, in.readOptionalWriteable(CpuUsageTracker.Stats::new));
@@ -63,6 +72,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
6372
builder.field(entry.getKey().getName(), entry.getValue());
6473
}
6574
builder.endObject();
75+
if (completionCount != -1) {
76+
builder.field("completion_count", completionCount);
77+
}
6678

6779
builder.startObject("cancellation_stats")
6880
.field("cancellation_count", cancellationCount)
@@ -76,6 +88,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
7688
public void writeTo(StreamOutput out) throws IOException {
7789
out.writeVLong(cancellationCount);
7890
out.writeVLong(limitReachedCount);
91+
if (out.getVersion().onOrAfter(Version.V_3_0_0)) {
92+
out.writeVLong(completionCount);
93+
}
7994

8095
out.writeOptionalWriteable(resourceUsageTrackerStats.get(TaskResourceUsageTrackerType.CPU_USAGE_TRACKER));
8196
out.writeOptionalWriteable(resourceUsageTrackerStats.get(TaskResourceUsageTrackerType.HEAP_USAGE_TRACKER));
@@ -89,11 +104,12 @@ public boolean equals(Object o) {
89104
SearchTaskStats that = (SearchTaskStats) o;
90105
return cancellationCount == that.cancellationCount
91106
&& limitReachedCount == that.limitReachedCount
107+
&& completionCount == that.completionCount
92108
&& resourceUsageTrackerStats.equals(that.resourceUsageTrackerStats);
93109
}
94110

95111
@Override
96112
public int hashCode() {
97-
return Objects.hash(cancellationCount, limitReachedCount, resourceUsageTrackerStats);
113+
return Objects.hash(cancellationCount, limitReachedCount, resourceUsageTrackerStats, completionCount);
98114
}
99115
}

server/src/test/java/org/opensearch/search/backpressure/SearchBackpressureServiceTests.java

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -249,10 +249,11 @@ public void testSearchTaskInFlightCancellation() {
249249
verify(mockTaskManager, times(10)).cancelTaskAndDescendants(any(), anyString(), anyBoolean(), any());
250250
assertEquals(3, service.getSearchBackpressureState(SearchTask.class).getLimitReachedCount());
251251

252-
// Verify search backpressure stats.
252+
// Verify search backpressure stats. Since we are not marking any task as completed the completionCount will be 0
253+
// for SearchTaskStats here.
253254
SearchBackpressureStats expectedStats = new SearchBackpressureStats(
254-
new SearchTaskStats(10, 3, Map.of(TaskResourceUsageTrackerType.CPU_USAGE_TRACKER, new MockStats(10))),
255-
new SearchShardTaskStats(0, 0, Collections.emptyMap()),
255+
new SearchTaskStats(10, 3, 0, Map.of(TaskResourceUsageTrackerType.CPU_USAGE_TRACKER, new MockStats(10))),
256+
new SearchShardTaskStats(0, 0, 0, Collections.emptyMap()),
256257
SearchBackpressureMode.ENFORCED
257258
);
258259
SearchBackpressureStats actualStats = service.nodeStats();
@@ -323,10 +324,11 @@ public void testSearchShardTaskInFlightCancellation() {
323324
verify(mockTaskManager, times(12)).cancelTaskAndDescendants(any(), anyString(), anyBoolean(), any());
324325
assertEquals(3, service.getSearchBackpressureState(SearchShardTask.class).getLimitReachedCount());
325326

326-
// Verify search backpressure stats.
327+
// Verify search backpressure stats. We are marking 20 SearchShardTasks as completed this should get
328+
// reflected in SearchShardTaskStats.
327329
SearchBackpressureStats expectedStats = new SearchBackpressureStats(
328-
new SearchTaskStats(0, 0, Collections.emptyMap()),
329-
new SearchShardTaskStats(12, 3, Map.of(TaskResourceUsageTrackerType.CPU_USAGE_TRACKER, new MockStats(12))),
330+
new SearchTaskStats(0, 0, 0, Collections.emptyMap()),
331+
new SearchShardTaskStats(12, 3, 20, Map.of(TaskResourceUsageTrackerType.CPU_USAGE_TRACKER, new MockStats(12))),
330332
SearchBackpressureMode.ENFORCED
331333
);
332334
SearchBackpressureStats actualStats = service.nodeStats();

server/src/test/java/org/opensearch/search/backpressure/stats/SearchShardTaskStatsTests.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,11 @@ public static SearchShardTaskStats randomInstance() {
3939
new ElapsedTimeTracker.Stats(randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong())
4040
);
4141

42-
return new SearchShardTaskStats(randomNonNegativeLong(), randomNonNegativeLong(), resourceUsageTrackerStats);
42+
return new SearchShardTaskStats(
43+
randomNonNegativeLong(),
44+
randomNonNegativeLong(),
45+
randomNonNegativeLong(),
46+
resourceUsageTrackerStats
47+
);
4348
}
4449
}

server/src/test/java/org/opensearch/search/backpressure/stats/SearchTaskStatsTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,6 @@ public static SearchTaskStats randomInstance() {
4040
new ElapsedTimeTracker.Stats(randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong())
4141
);
4242

43-
return new SearchTaskStats(randomNonNegativeLong(), randomNonNegativeLong(), resourceUsageTrackerStats);
43+
return new SearchTaskStats(randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(), resourceUsageTrackerStats);
4444
}
4545
}

0 commit comments

Comments
 (0)