Skip to content

Commit a6c40f7

Browse files
committed
Rewritten the logic using accumlator as proposed
Signed-off-by: Sriram Ganesh <[email protected]>
1 parent d723db8 commit a6c40f7

File tree

7 files changed

+183
-5
lines changed

7 files changed

+183
-5
lines changed
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.common.metrics;
10+
11+
import java.util.concurrent.atomic.LongAccumulator;
12+
13+
/**
14+
* A metric for tracking the maximum value seen.
15+
*
16+
* @opensearch.internal
17+
*/
18+
public class MaxMetric implements Metric {
19+
private final LongAccumulator max = new LongAccumulator(Long::max, Long.MIN_VALUE);
20+
21+
public void collect(long value) {
22+
max.accumulate(value);
23+
}
24+
25+
public long get() {
26+
return max.get();
27+
}
28+
29+
public void clear() {
30+
max.reset();
31+
}
32+
}

server/src/main/java/org/opensearch/index/shard/IndexShard.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -439,7 +439,7 @@ public IndexShard(
439439
);
440440
this.mapperService = mapperService;
441441
this.indexCache = indexCache;
442-
this.internalIndexingStats = new InternalIndexingStats();
442+
this.internalIndexingStats = new InternalIndexingStats(threadPool);
443443
final List<IndexingOperationListener> listenersList = new ArrayList<>(listeners);
444444
listenersList.add(internalIndexingStats);
445445
this.indexingOperationListeners = new IndexingOperationListener.CompositeListener(listenersList, logger);

server/src/main/java/org/opensearch/index/shard/IndexingStats.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,7 @@ public void writeTo(StreamOutput out) throws IOException {
159159
private long throttleTimeInMillis;
160160
private boolean isThrottled;
161161
private final DocStatusStats docStatusStats;
162+
private long maxLastIndexRequestTimestamp;
162163

163164
Stats() {
164165
docStatusStats = new DocStatusStats();
@@ -175,6 +176,7 @@ public Stats(StreamInput in) throws IOException {
175176
noopUpdateCount = in.readVLong();
176177
isThrottled = in.readBoolean();
177178
throttleTimeInMillis = in.readLong();
179+
maxLastIndexRequestTimestamp = in.readLong();
178180

179181
if (in.getVersion().onOrAfter(Version.V_2_11_0)) {
180182
docStatusStats = in.readOptionalWriteable(DocStatusStats::new);
@@ -194,7 +196,8 @@ public Stats(
194196
long noopUpdateCount,
195197
boolean isThrottled,
196198
long throttleTimeInMillis,
197-
DocStatusStats docStatusStats
199+
DocStatusStats docStatusStats,
200+
long maxLastIndexRequestTimestamp
198201
) {
199202
this.indexCount = indexCount;
200203
this.indexTimeInMillis = indexTimeInMillis;
@@ -207,6 +210,7 @@ public Stats(
207210
this.isThrottled = isThrottled;
208211
this.throttleTimeInMillis = throttleTimeInMillis;
209212
this.docStatusStats = docStatusStats;
213+
this.maxLastIndexRequestTimestamp = maxLastIndexRequestTimestamp;
210214
}
211215

212216
public void add(Stats stats) {
@@ -226,6 +230,8 @@ public void add(Stats stats) {
226230
if (getDocStatusStats() != null) {
227231
getDocStatusStats().add(stats.getDocStatusStats());
228232
}
233+
// Aggregate maxLastIndexRequestTimestamp using Math.max
234+
maxLastIndexRequestTimestamp = Math.max(maxLastIndexRequestTimestamp, stats.maxLastIndexRequestTimestamp);
229235
}
230236

231237
/**
@@ -299,6 +305,10 @@ public DocStatusStats getDocStatusStats() {
299305
return docStatusStats;
300306
}
301307

308+
public long getMaxLastIndexRequestTimestamp() {
309+
return maxLastIndexRequestTimestamp;
310+
}
311+
302312
@Override
303313
public void writeTo(StreamOutput out) throws IOException {
304314
out.writeVLong(indexCount);
@@ -311,6 +321,7 @@ public void writeTo(StreamOutput out) throws IOException {
311321
out.writeVLong(noopUpdateCount);
312322
out.writeBoolean(isThrottled);
313323
out.writeLong(throttleTimeInMillis);
324+
out.writeLong(maxLastIndexRequestTimestamp);
314325

315326
if (out.getVersion().onOrAfter(Version.V_2_11_0)) {
316327
out.writeOptionalWriteable(docStatusStats);

server/src/main/java/org/opensearch/index/shard/InternalIndexingStats.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,9 +33,11 @@
3333
package org.opensearch.index.shard;
3434

3535
import org.opensearch.common.metrics.CounterMetric;
36+
import org.opensearch.common.metrics.MaxMetric;
3637
import org.opensearch.common.metrics.MeanMetric;
3738
import org.opensearch.core.index.shard.ShardId;
3839
import org.opensearch.index.engine.Engine;
40+
import org.opensearch.threadpool.ThreadPool;
3941

4042
import java.util.concurrent.TimeUnit;
4143

@@ -47,6 +49,11 @@
4749
*/
4850
final class InternalIndexingStats implements IndexingOperationListener {
4951
private final StatsHolder totalStats = new StatsHolder();
52+
private final ThreadPool threadPool;
53+
54+
InternalIndexingStats(ThreadPool threadPool) {
55+
this.threadPool = threadPool;
56+
}
5057

5158
/**
5259
* Returns the stats, including type specific stats. If the types are null/0 length, then nothing
@@ -74,6 +81,8 @@ public void postIndex(ShardId shardId, Engine.Index index, Engine.IndexResult re
7481
long took = result.getTook();
7582
totalStats.indexMetric.inc(took);
7683
totalStats.indexCurrent.dec();
84+
long now = threadPool.absoluteTimeInMillis();
85+
totalStats.maxLastIndexRequestTimestamp.collect(now);
7786
}
7887
break;
7988
case FAILURE:
@@ -142,6 +151,7 @@ static class StatsHolder {
142151
private final CounterMetric indexFailed = new CounterMetric();
143152
private final CounterMetric deleteCurrent = new CounterMetric();
144153
private final CounterMetric noopUpdates = new CounterMetric();
154+
private final MaxMetric maxLastIndexRequestTimestamp = new MaxMetric();
145155

146156
IndexingStats.Stats stats(boolean isThrottled, long currentThrottleMillis) {
147157
return new IndexingStats.Stats(
@@ -155,7 +165,8 @@ IndexingStats.Stats stats(boolean isThrottled, long currentThrottleMillis) {
155165
noopUpdates.count(),
156166
isThrottled,
157167
TimeUnit.MILLISECONDS.toMillis(currentThrottleMillis),
158-
new IndexingStats.Stats.DocStatusStats()
168+
new IndexingStats.Stats.DocStatusStats(),
169+
maxLastIndexRequestTimestamp.get()
159170
);
160171
}
161172
}

server/src/main/java/org/opensearch/rest/action/cat/RestIndicesAction.java

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -771,6 +771,15 @@ protected Table getTableWithHeader(final RestRequest request, final PageToken pa
771771

772772
table.addCell("search.throttled", "alias:sth;default:false;desc:indicates if the index is search throttled");
773773

774+
table.addCell(
775+
"last_index_request_timestamp",
776+
"alias:last_index_ts,lastIndexRequestTimestamp;default:false;text-align:right;desc:timestamp of the last processed index request (epoch millis)"
777+
);
778+
table.addCell(
779+
"last_index_request_timestamp_string",
780+
"alias:last_index_ts_string,lastIndexRequestTimestampString;default:false;text-align:right;desc:timestamp of the last processed index request (ISO8601 string)"
781+
);
782+
774783
table.endHeaders();
775784
return table;
776785
}
@@ -1058,8 +1067,13 @@ protected Table buildTable(
10581067

10591068
table.addCell(searchThrottled);
10601069

1061-
table.endRow();
1070+
table.addCell(totalStats.getIndexing() == null ? null : totalStats.getIndexing().getTotal().getMaxLastIndexRequestTimestamp());
1071+
Long ts = totalStats.getIndexing() == null ? null : totalStats.getIndexing().getTotal().getMaxLastIndexRequestTimestamp();
1072+
table.addCell(
1073+
ts == null || ts == 0 ? null : STRICT_DATE_TIME_FORMATTER.format(Instant.ofEpochMilli(ts).atZone(ZoneOffset.UTC))
1074+
);
10621075

1076+
table.endRow();
10631077
}
10641078

10651079
return table;

server/src/test/java/org/opensearch/index/shard/IndexingStatsTests.java

Lines changed: 43 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,47 @@ public void testToXContentForIndexingStats() throws IOException {
115115
assertEquals(expected, xContentBuilder.toString());
116116
}
117117

118+
/**
119+
* Tests aggregation logic for maxLastIndexRequestTimestamp in IndexingStats.Stats.
120+
* Uses reflection because the field is private and not settable via public API.
121+
* This ensures that aggregation (add) always surfaces the maximum value, even across multiple adds and random values.
122+
*/
123+
public void testMaxLastIndexRequestTimestampAggregation() throws Exception {
124+
IndexingStats.Stats stats1 = new IndexingStats.Stats();
125+
IndexingStats.Stats stats2 = new IndexingStats.Stats();
126+
IndexingStats.Stats stats3 = new IndexingStats.Stats();
127+
java.lang.reflect.Field tsField = IndexingStats.Stats.class.getDeclaredField("maxLastIndexRequestTimestamp");
128+
tsField.setAccessible(true);
129+
130+
// Use random values for robustness
131+
long ts1 = randomLongBetween(0, 1000000);
132+
long ts2 = randomLongBetween(0, 1000000);
133+
long ts3 = randomLongBetween(0, 1000000);
134+
135+
tsField.set(stats1, ts1);
136+
tsField.set(stats2, ts2);
137+
tsField.set(stats3, ts3);
138+
139+
// Aggregate stats1 + stats2
140+
stats1.add(stats2);
141+
assertEquals(Math.max(ts1, ts2), stats1.getMaxLastIndexRequestTimestamp());
142+
143+
// Aggregate stats1 + stats3
144+
stats1.add(stats3);
145+
assertEquals(Math.max(Math.max(ts1, ts2), ts3), stats1.getMaxLastIndexRequestTimestamp());
146+
147+
// Test with zero and negative values
148+
tsField.set(stats1, 0L);
149+
tsField.set(stats2, -100L);
150+
stats1.add(stats2);
151+
assertEquals(0L, stats1.getMaxLastIndexRequestTimestamp());
152+
153+
tsField.set(stats1, -50L);
154+
tsField.set(stats2, -100L);
155+
stats1.add(stats2);
156+
assertEquals(-50L, stats1.getMaxLastIndexRequestTimestamp());
157+
}
158+
118159
private IndexingStats createTestInstance() {
119160
IndexingStats.Stats.DocStatusStats docStatusStats = new IndexingStats.Stats.DocStatusStats();
120161
for (int i = 1; i < 6; ++i) {
@@ -132,7 +173,8 @@ private IndexingStats createTestInstance() {
132173
randomNonNegativeLong(),
133174
randomBoolean(),
134175
randomNonNegativeLong(),
135-
docStatusStats
176+
docStatusStats,
177+
randomNonNegativeLong()
136178
);
137179

138180
return new IndexingStats(stats);

server/src/test/java/org/opensearch/rest/action/cat/RestIndicesActionTests.java

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -209,6 +209,14 @@ private void assertTableHeaders(Table table) {
209209
assertThat(headers.get(3).value, equalTo("uuid"));
210210
assertThat(headers.get(4).value, equalTo("pri"));
211211
assertThat(headers.get(5).value, equalTo("rep"));
212+
// Check for new columns (at the end)
213+
boolean foundRaw = false, foundString = false;
214+
for (Table.Cell cell : headers) {
215+
if ("last_index_request_timestamp".equals(cell.value)) foundRaw = true;
216+
if ("last_index_request_timestamp_string".equals(cell.value)) foundString = true;
217+
}
218+
assertTrue(foundRaw);
219+
assertTrue(foundString);
212220
}
213221

214222
private void assertTableRows(Table table) {
@@ -241,4 +249,64 @@ private void assertTableRows(Table table) {
241249
}
242250
}
243251
}
252+
253+
public void testLastIndexRequestTimestampColumns() {
254+
final ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
255+
final Settings settings = Settings.builder().build();
256+
final ResponseLimitSettings responseLimitSettings = new ResponseLimitSettings(clusterSettings, settings);
257+
final RestIndicesAction action = new RestIndicesAction(responseLimitSettings);
258+
// Setup a known timestamp
259+
long knownTs = 1710000000000L;
260+
IndexStats indexStats = mock(IndexStats.class);
261+
CommonStats commonStats = mock(CommonStats.class);
262+
org.opensearch.index.shard.IndexingStats indexingStats = mock(org.opensearch.index.shard.IndexingStats.class);
263+
org.opensearch.index.shard.IndexingStats.Stats stats = mock(org.opensearch.index.shard.IndexingStats.Stats.class);
264+
when(indexStats.getTotal()).thenReturn(commonStats);
265+
when(indexStats.getPrimaries()).thenReturn(commonStats);
266+
when(commonStats.getIndexing()).thenReturn(indexingStats);
267+
when(indexingStats.getTotal()).thenReturn(stats);
268+
when(stats.getMaxLastIndexRequestTimestamp()).thenReturn(knownTs);
269+
Map<String, IndexStats> testStats = new LinkedHashMap<>();
270+
String testIndex = "test-index";
271+
testStats.put(testIndex, indexStats);
272+
Map<String, Settings> testSettings = new LinkedHashMap<>();
273+
testSettings.put(testIndex, Settings.EMPTY);
274+
Map<String, IndexMetadata> testMetadatas = new LinkedHashMap<>();
275+
Settings indexSettings = Settings.builder().put(IndexMetadata.SETTING_VERSION_CREATED, org.opensearch.Version.CURRENT).build();
276+
testMetadatas.put(
277+
testIndex,
278+
IndexMetadata.builder(testIndex).settings(indexSettings).numberOfShards(1).numberOfReplicas(0).build()
279+
);
280+
Map<String, ClusterIndexHealth> testHealths = new LinkedHashMap<>();
281+
Table table = action.buildTable(
282+
new FakeRestRequest(),
283+
testSettings,
284+
testHealths,
285+
testStats,
286+
testMetadatas,
287+
action.getTableIterator(new String[] { testIndex }, testSettings),
288+
null
289+
);
290+
// Find the columns
291+
List<Table.Cell> header = table.getHeaders();
292+
int rawIdx = -1, strIdx = -1;
293+
for (int i = 0; i < header.size(); i++) {
294+
if ("last_index_request_timestamp".equals(header.get(i).value)) rawIdx = i;
295+
if ("last_index_request_timestamp_string".equals(header.get(i).value)) strIdx = i;
296+
}
297+
assertTrue(rawIdx != -1);
298+
assertTrue(strIdx != -1);
299+
List<List<Table.Cell>> rows = table.getRows();
300+
assertEquals(1, rows.size());
301+
List<Table.Cell> row = rows.get(0);
302+
assertEquals(String.valueOf(knownTs), row.get(rawIdx).value.toString());
303+
// Robust: parse the string as ISO-8601 and compare to knownTs
304+
String timestampString = row.get(strIdx).value.toString();
305+
try {
306+
java.time.Instant parsed = java.time.Instant.parse(timestampString);
307+
assertEquals(knownTs, parsed.toEpochMilli());
308+
} catch (java.time.format.DateTimeParseException e) {
309+
fail("Timestamp string is not a valid ISO-8601 date: " + timestampString);
310+
}
311+
}
244312
}

0 commit comments

Comments
 (0)