Skip to content

Commit e1826c2

Browse files
authored
Added time_in_execution attribute to /_cluster/pending_tasks response (#17780)
Signed-off-by: Manik Garg <[email protected]>
1 parent e21e7ec commit e1826c2

File tree

7 files changed

+81
-7
lines changed

7 files changed

+81
-7
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
3333
- [repository-s3] Add support for SSE-KMS and S3 bucket owner verification ([#18312](https://github.com/opensearch-project/OpenSearch/pull/18312))
3434
- Optimize gRPC perf by passing by reference ([#18303](https://github.com/opensearch-project/OpenSearch/pull/18303))
3535
- Added File Cache Stats - Involves Block level as well as full file level stats ([#17538](https://github.com/opensearch-project/OpenSearch/issues/17479))
36+
- Added time_in_execution attribute to /_cluster/pending_tasks response ([#17780](https://github.com/opensearch-project/OpenSearch/pull/17780))
3637
- Added File Cache Pinning ([#17617](https://github.com/opensearch-project/OpenSearch/issues/13648))
3738
- Support consumer reset in Resume API for pull-based ingestion. This PR includes a breaking change for the experimental pull-based ingestion feature. ([#18332](https://github.com/opensearch-project/OpenSearch/pull/18332))
3839
- Add FIPS build tooling ([#4254](https://github.com/opensearch-project/security/issues/4254))
@@ -42,6 +43,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
4243
- Change implementation for `percentiles` aggregation for latency improvement ([#18124](https://github.com/opensearch-project/OpenSearch/pull/18124))
4344
- Update FipsMode check to catch NoSuchMethodError ([#18427](https://github.com/opensearch-project/OpenSearch/pull/18427))
4445

46+
4547
### Dependencies
4648
- Update Apache Lucene from 10.1.0 to 10.2.1 ([#17961](https://github.com/opensearch-project/OpenSearch/pull/17961))
4749
- Bump `com.google.code.gson:gson` from 2.12.1 to 2.13.1 ([#17923](https://github.com/opensearch-project/OpenSearch/pull/17923), [#18266](https://github.com/opensearch-project/OpenSearch/pull/18266))

server/src/internalClusterTest/java/org/opensearch/cluster/service/ClusterServiceIT.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -351,8 +351,9 @@ public void testPendingUpdateTask() throws Exception {
351351
clusterService.submitStateUpdateTask("1", new ClusterStateUpdateTask() {
352352
@Override
353353
public ClusterState execute(ClusterState currentState) {
354-
invoked1.countDown();
355354
try {
355+
Thread.sleep(50);
356+
invoked1.countDown();
356357
block1.await();
357358
} catch (InterruptedException e) {
358359
fail();
@@ -395,6 +396,9 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS
395396
assertThat(pendingClusterTasks.size(), greaterThanOrEqualTo(10));
396397
assertThat(pendingClusterTasks.get(0).getSource().string(), equalTo("1"));
397398
assertThat(pendingClusterTasks.get(0).isExecuting(), equalTo(true));
399+
assertThat(pendingClusterTasks.get(0).getTimeInExecutionInMillis(), greaterThan(0L));
400+
assertThat(pendingClusterTasks.get(1).isExecuting(), equalTo(false));
401+
assertThat(pendingClusterTasks.get(1).getTimeInExecutionInMillis(), equalTo(0L));
398402
for (PendingClusterTask task : pendingClusterTasks) {
399403
controlSources.remove(task.getSource().string());
400404
}
@@ -405,6 +409,9 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS
405409
assertThat(response.pendingTasks().size(), greaterThanOrEqualTo(10));
406410
assertThat(response.pendingTasks().get(0).getSource().string(), equalTo("1"));
407411
assertThat(response.pendingTasks().get(0).isExecuting(), equalTo(true));
412+
assertThat(response.pendingTasks().get(0).getTimeInExecutionInMillis(), greaterThan(0L));
413+
assertThat(response.pendingTasks().get(1).isExecuting(), equalTo(false));
414+
assertThat(response.pendingTasks().get(1).getTimeInExecutionInMillis(), equalTo(0L));
408415
for (PendingClusterTask task : response) {
409416
controlSources.remove(task.getSource().string());
410417
}

server/src/main/java/org/opensearch/action/admin/cluster/tasks/PendingClusterTasksResponse.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,8 @@ public String toString() {
9191
.append(pendingClusterTask.getSource())
9292
.append("/")
9393
.append(pendingClusterTask.getTimeInQueue())
94+
.append("/")
95+
.append(pendingClusterTask.getTimeInExecution())
9496
.append("\n");
9597
}
9698
return sb.toString();
@@ -108,6 +110,8 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
108110
builder.field(Fields.EXECUTING, pendingClusterTask.isExecuting());
109111
builder.field(Fields.TIME_IN_QUEUE_MILLIS, pendingClusterTask.getTimeInQueueInMillis());
110112
builder.field(Fields.TIME_IN_QUEUE, pendingClusterTask.getTimeInQueue());
113+
builder.field(Fields.TIME_IN_EXECUTION_MILLIS, pendingClusterTask.getTimeInExecutionInMillis());
114+
builder.field(Fields.TIME_IN_EXECUTION, pendingClusterTask.getTimeInExecution());
111115
builder.endObject();
112116
}
113117
builder.endArray();
@@ -129,6 +133,8 @@ static final class Fields {
129133
static final String SOURCE = "source";
130134
static final String TIME_IN_QUEUE_MILLIS = "time_in_queue_millis";
131135
static final String TIME_IN_QUEUE = "time_in_queue";
136+
static final String TIME_IN_EXECUTION_MILLIS = "time_in_execution_millis";
137+
static final String TIME_IN_EXECUTION = "time_in_execution";
132138

133139
}
134140

server/src/main/java/org/opensearch/cluster/service/ClusterManagerService.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -609,7 +609,8 @@ public List<PendingClusterTask> pendingTasks() {
609609
pending.priority,
610610
new Text(task.source()),
611611
task.getAgeInMillis(),
612-
pending.executing
612+
pending.executing,
613+
pending.executionTimeInMillis
613614
);
614615
}).collect(Collectors.toList());
615616
}

server/src/main/java/org/opensearch/cluster/service/PendingClusterTask.java

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232

3333
package org.opensearch.cluster.service;
3434

35+
import org.opensearch.Version;
3536
import org.opensearch.common.Priority;
3637
import org.opensearch.common.annotation.PublicApi;
3738
import org.opensearch.common.unit.TimeValue;
@@ -55,23 +56,29 @@ public class PendingClusterTask implements Writeable {
5556
private Text source;
5657
private long timeInQueue;
5758
private boolean executing;
59+
private long timeInExecution;
5860

5961
public PendingClusterTask(StreamInput in) throws IOException {
6062
insertOrder = in.readVLong();
6163
priority = Priority.readFrom(in);
6264
source = in.readText();
6365
timeInQueue = in.readLong();
6466
executing = in.readBoolean();
67+
if (in.getVersion().onOrAfter(Version.V_3_1_0)) {
68+
timeInExecution = in.readLong();
69+
}
6570
}
6671

67-
public PendingClusterTask(long insertOrder, Priority priority, Text source, long timeInQueue, boolean executing) {
72+
public PendingClusterTask(long insertOrder, Priority priority, Text source, long timeInQueue, boolean executing, long timeInExecution) {
6873
assert timeInQueue >= 0 : "got a negative timeInQueue [" + timeInQueue + "]";
6974
assert insertOrder >= 0 : "got a negative insertOrder [" + insertOrder + "]";
75+
assert timeInExecution >= 0 : "got a negative timeInExecution [" + timeInExecution + "]";
7076
this.insertOrder = insertOrder;
7177
this.priority = priority;
7278
this.source = source;
7379
this.timeInQueue = timeInQueue;
7480
this.executing = executing;
81+
this.timeInExecution = timeInExecution;
7582
}
7683

7784
public long getInsertOrder() {
@@ -90,10 +97,18 @@ public long getTimeInQueueInMillis() {
9097
return timeInQueue;
9198
}
9299

100+
public long getTimeInExecutionInMillis() {
101+
return timeInExecution;
102+
}
103+
93104
public TimeValue getTimeInQueue() {
94105
return new TimeValue(getTimeInQueueInMillis());
95106
}
96107

108+
public TimeValue getTimeInExecution() {
109+
return new TimeValue(getTimeInExecutionInMillis());
110+
}
111+
97112
public boolean isExecuting() {
98113
return executing;
99114
}
@@ -105,5 +120,8 @@ public void writeTo(StreamOutput out) throws IOException {
105120
out.writeText(source);
106121
out.writeLong(timeInQueue);
107122
out.writeBoolean(executing);
123+
if (out.getVersion().onOrAfter(Version.V_3_1_0)) {
124+
out.writeLong(timeInExecution);
125+
}
108126
}
109127
}

server/src/main/java/org/opensearch/common/util/concurrent/PrioritizedOpenSearchThreadPoolExecutor.java

Lines changed: 39 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,8 @@
3838
import java.util.List;
3939
import java.util.Queue;
4040
import java.util.concurrent.Callable;
41+
import java.util.concurrent.ConcurrentHashMap;
42+
import java.util.concurrent.ConcurrentMap;
4143
import java.util.concurrent.FutureTask;
4244
import java.util.concurrent.PriorityBlockingQueue;
4345
import java.util.concurrent.RunnableFuture;
@@ -62,6 +64,7 @@ public class PrioritizedOpenSearchThreadPoolExecutor extends OpenSearchThreadPoo
6264
private final AtomicLong insertionOrder = new AtomicLong();
6365
private final Queue<Runnable> current = ConcurrentCollections.newQueue();
6466
private final ScheduledExecutorService timer;
67+
private final ConcurrentMap<Runnable, TaskMetrics> taskMetrics = new ConcurrentHashMap<>();
6568

6669
public PrioritizedOpenSearchThreadPoolExecutor(
6770
String name,
@@ -114,6 +117,15 @@ public TimeValue getMaxTaskWaitTime() {
114117

115118
private void addPending(List<Runnable> runnables, List<Pending> pending, boolean executing) {
116119
for (Runnable runnable : runnables) {
120+
long executionTimeInMillis = 0;
121+
122+
if (executing) {
123+
TaskMetrics metrics = taskMetrics.get(runnable);
124+
if (metrics != null) {
125+
executionTimeInMillis = metrics.getExecutionTimeMillis();
126+
}
127+
}
128+
117129
if (runnable instanceof TieBreakingPrioritizedRunnable) {
118130
TieBreakingPrioritizedRunnable t = (TieBreakingPrioritizedRunnable) runnable;
119131
Runnable innerRunnable = t.runnable;
@@ -122,28 +134,30 @@ private void addPending(List<Runnable> runnables, List<Pending> pending, boolean
122134
innerRunnable can be null if task is finished but not removed from executor yet,
123135
see {@link TieBreakingPrioritizedRunnable#run} and {@link TieBreakingPrioritizedRunnable#runAndClean}
124136
*/
125-
pending.add(new Pending(super.unwrap(innerRunnable), t.priority(), t.insertionOrder, executing));
137+
pending.add(new Pending(super.unwrap(innerRunnable), t.priority(), t.insertionOrder, executing, executionTimeInMillis));
126138
}
127139
} else if (runnable instanceof PrioritizedFutureTask) {
128140
PrioritizedFutureTask t = (PrioritizedFutureTask) runnable;
129141
Object task = t.task;
130142
if (t.task instanceof Runnable) {
131143
task = super.unwrap((Runnable) t.task);
132144
}
133-
pending.add(new Pending(task, t.priority, t.insertionOrder, executing));
145+
pending.add(new Pending(task, t.priority, t.insertionOrder, executing, executionTimeInMillis));
134146
}
135147
}
136148
}
137149

138150
@Override
139151
protected void beforeExecute(Thread t, Runnable r) {
140152
current.add(r);
153+
taskMetrics.put(r, new TaskMetrics());
141154
}
142155

143156
@Override
144157
protected void afterExecute(Runnable r, Throwable t) {
145158
super.afterExecute(r, t);
146159
current.remove(r);
160+
taskMetrics.remove(r); // Clean up metrics when task completes
147161
}
148162

149163
public void execute(Runnable command, final TimeValue timeout, final Runnable timeoutCallback) {
@@ -211,12 +225,14 @@ public static class Pending {
211225
public final Priority priority;
212226
public final long insertionOrder;
213227
public final boolean executing;
228+
public final long executionTimeInMillis;
214229

215-
public Pending(Object task, Priority priority, long insertionOrder, boolean executing) {
230+
public Pending(Object task, Priority priority, long insertionOrder, boolean executing, long executionTimeInMillis) {
216231
this.task = task;
217232
this.priority = priority;
218233
this.insertionOrder = insertionOrder;
219234
this.executing = executing;
235+
this.executionTimeInMillis = executionTimeInMillis;
220236
}
221237
}
222238

@@ -291,7 +307,6 @@ private void runAndClean(Runnable run) {
291307
public Runnable unwrap() {
292308
return runnable;
293309
}
294-
295310
}
296311

297312
private static final class PrioritizedFutureTask<T> extends FutureTask<T> implements Comparable<PrioritizedFutureTask> {
@@ -324,4 +339,24 @@ public int compareTo(PrioritizedFutureTask pft) {
324339
}
325340
}
326341

342+
/**
343+
* Generic class to track various task metrics.
344+
* This implementation tracks task execution time, but can be extended
345+
* to include additional metrics.
346+
*/
347+
private static class TaskMetrics {
348+
private final long startTimeNanos;
349+
350+
TaskMetrics() {
351+
this.startTimeNanos = System.nanoTime();
352+
}
353+
354+
/**
355+
* Get the task execution time in milliseconds.
356+
*/
357+
long getExecutionTimeMillis() {
358+
return TimeUnit.MILLISECONDS.convert(System.nanoTime() - startTimeNanos, TimeUnit.NANOSECONDS);
359+
}
360+
}
361+
327362
}

server/src/test/java/org/opensearch/common/util/concurrent/PrioritizedExecutorsTests.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@
5252
import java.util.concurrent.atomic.AtomicBoolean;
5353

5454
import static org.hamcrest.Matchers.equalTo;
55+
import static org.hamcrest.Matchers.greaterThan;
5556
import static org.hamcrest.Matchers.is;
5657

5758
public class PrioritizedExecutorsTests extends OpenSearchTestCase {
@@ -231,6 +232,7 @@ public void testTimeout() throws Exception {
231232
@Override
232233
public void run() {
233234
try {
235+
Thread.sleep(50);
234236
invoked.countDown();
235237
block.await();
236238
} catch (InterruptedException e) {
@@ -248,6 +250,7 @@ public String toString() {
248250
assertThat(pending.length, equalTo(1));
249251
assertThat(pending[0].task.toString(), equalTo("the blocking"));
250252
assertThat(pending[0].executing, equalTo(true));
253+
assertThat(pending[0].executionTimeInMillis, greaterThan(0L));
251254

252255
final AtomicBoolean executeCalled = new AtomicBoolean();
253256
final CountDownLatch timedOut = new CountDownLatch(1);
@@ -272,8 +275,10 @@ public void run() {
272275
assertThat(pending.length, equalTo(2));
273276
assertThat(pending[0].task.toString(), equalTo("the blocking"));
274277
assertThat(pending[0].executing, equalTo(true));
278+
assertThat(pending[0].executionTimeInMillis, greaterThan(0L));
275279
assertThat(pending[1].task.toString(), equalTo("the waiting"));
276280
assertThat(pending[1].executing, equalTo(false));
281+
assertThat(pending[1].executionTimeInMillis, equalTo(0L));
277282

278283
assertThat(timedOut.await(2, TimeUnit.SECONDS), equalTo(true));
279284
block.countDown();

0 commit comments

Comments
 (0)