Skip to content

Expose Last Index Request Timestamp in Cat Indices API #18405

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Approximation Framework Enhancement: Update the BKD traversal logic to improve the performance on skewed data ([#18439](https://github.com/opensearch-project/OpenSearch/issues/18439))
- Support system generated ingest pipelines for bulk update operations ([#18277](https://github.com/opensearch-project/OpenSearch/pull/18277)))
- Added FS Health Check Failure metric ([#18435](https://github.com/opensearch-project/OpenSearch/pull/18435))
- Added last index request timestamp columns to the `_cat/indices` API. ([10766](https://github.com/opensearch-project/OpenSearch/issues/10766))

### Changed
- Create generic DocRequest to better categorize ActionRequests ([#18269](https://github.com/opensearch-project/OpenSearch/pull/18269)))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.common.metrics;

import java.util.concurrent.atomic.LongAccumulator;

/**
* A metric for tracking the maximum value seen.
*
* @opensearch.internal
*/
public class MaxMetric implements Metric {
private final LongAccumulator max = new LongAccumulator(Long::max, Long.MIN_VALUE);

public void collect(long value) {
max.accumulate(value);
}

public long get() {
return max.get();
}

public void clear() {
max.reset();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -440,7 +440,7 @@ public IndexShard(
);
this.mapperService = mapperService;
this.indexCache = indexCache;
this.internalIndexingStats = new InternalIndexingStats();
this.internalIndexingStats = new InternalIndexingStats(threadPool);
final List<IndexingOperationListener> listenersList = new ArrayList<>(listeners);
listenersList.add(internalIndexingStats);
this.indexingOperationListeners = new IndexingOperationListener.CompositeListener(listenersList, logger);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ public void writeTo(StreamOutput out) throws IOException {
private long throttleTimeInMillis;
private boolean isThrottled;
private final DocStatusStats docStatusStats;
private long maxLastIndexRequestTimestamp;

Stats() {
docStatusStats = new DocStatusStats();
Expand All @@ -175,7 +176,11 @@ public Stats(StreamInput in) throws IOException {
noopUpdateCount = in.readVLong();
isThrottled = in.readBoolean();
throttleTimeInMillis = in.readLong();

if (in.getVersion().onOrAfter(Version.V_3_1_0)) {
maxLastIndexRequestTimestamp = in.readVLong();
} else {
maxLastIndexRequestTimestamp = 0L;
}
if (in.getVersion().onOrAfter(Version.V_2_11_0)) {
docStatusStats = in.readOptionalWriteable(DocStatusStats::new);
} else {
Expand All @@ -194,7 +199,8 @@ public Stats(
long noopUpdateCount,
boolean isThrottled,
long throttleTimeInMillis,
DocStatusStats docStatusStats
DocStatusStats docStatusStats,
long maxLastIndexRequestTimestamp
) {
this.indexCount = indexCount;
this.indexTimeInMillis = indexTimeInMillis;
Expand All @@ -207,6 +213,7 @@ public Stats(
this.isThrottled = isThrottled;
this.throttleTimeInMillis = throttleTimeInMillis;
this.docStatusStats = docStatusStats;
this.maxLastIndexRequestTimestamp = maxLastIndexRequestTimestamp;
}

public void add(Stats stats) {
Expand All @@ -226,6 +233,8 @@ public void add(Stats stats) {
if (getDocStatusStats() != null) {
getDocStatusStats().add(stats.getDocStatusStats());
}

maxLastIndexRequestTimestamp = Math.max(maxLastIndexRequestTimestamp, stats.maxLastIndexRequestTimestamp);
}

/**
Expand Down Expand Up @@ -299,6 +308,10 @@ public DocStatusStats getDocStatusStats() {
return docStatusStats;
}

public long getMaxLastIndexRequestTimestamp() {
return maxLastIndexRequestTimestamp;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVLong(indexCount);
Expand All @@ -311,7 +324,9 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeVLong(noopUpdateCount);
out.writeBoolean(isThrottled);
out.writeLong(throttleTimeInMillis);

if (out.getVersion().onOrAfter(Version.V_3_1_0)) {
out.writeVLong(maxLastIndexRequestTimestamp);
}
if (out.getVersion().onOrAfter(Version.V_2_11_0)) {
out.writeOptionalWriteable(docStatusStats);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,11 @@
package org.opensearch.index.shard;

import org.opensearch.common.metrics.CounterMetric;
import org.opensearch.common.metrics.MaxMetric;
import org.opensearch.common.metrics.MeanMetric;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.index.engine.Engine;
import org.opensearch.threadpool.ThreadPool;

import java.util.concurrent.TimeUnit;

Expand All @@ -47,6 +49,11 @@
*/
final class InternalIndexingStats implements IndexingOperationListener {
private final StatsHolder totalStats = new StatsHolder();
private final ThreadPool threadPool;

InternalIndexingStats(ThreadPool threadPool) {
this.threadPool = threadPool;
}

/**
* Returns the stats, including type specific stats. If the types are null/0 length, then nothing
Expand Down Expand Up @@ -74,6 +81,8 @@ public void postIndex(ShardId shardId, Engine.Index index, Engine.IndexResult re
long took = result.getTook();
totalStats.indexMetric.inc(took);
totalStats.indexCurrent.dec();
long now = threadPool.absoluteTimeInMillis();
totalStats.maxLastIndexRequestTimestamp.collect(now);
}
break;
case FAILURE:
Expand Down Expand Up @@ -142,6 +151,7 @@ static class StatsHolder {
private final CounterMetric indexFailed = new CounterMetric();
private final CounterMetric deleteCurrent = new CounterMetric();
private final CounterMetric noopUpdates = new CounterMetric();
private final MaxMetric maxLastIndexRequestTimestamp = new MaxMetric();

IndexingStats.Stats stats(boolean isThrottled, long currentThrottleMillis) {
return new IndexingStats.Stats(
Expand All @@ -155,7 +165,8 @@ IndexingStats.Stats stats(boolean isThrottled, long currentThrottleMillis) {
noopUpdates.count(),
isThrottled,
TimeUnit.MILLISECONDS.toMillis(currentThrottleMillis),
new IndexingStats.Stats.DocStatusStats()
new IndexingStats.Stats.DocStatusStats(),
maxLastIndexRequestTimestamp.get()
);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -771,6 +771,15 @@ protected Table getTableWithHeader(final RestRequest request, final PageToken pa

table.addCell("search.throttled", "alias:sth;default:false;desc:indicates if the index is search throttled");

table.addCell(
"last_index_request_timestamp",
"alias:last_index_ts,lastIndexRequestTimestamp;default:false;text-align:right;desc:timestamp of the last processed index request (epoch millis)"
);
table.addCell(
"last_index_request_timestamp_string",
"alias:last_index_ts_string,lastIndexRequestTimestampString;default:false;text-align:right;desc:timestamp of the last processed index request (ISO8601 string)"
);

table.endHeaders();
return table;
}
Expand Down Expand Up @@ -1058,8 +1067,13 @@ protected Table buildTable(

table.addCell(searchThrottled);

table.endRow();
table.addCell(totalStats.getIndexing() == null ? null : totalStats.getIndexing().getTotal().getMaxLastIndexRequestTimestamp());
Long ts = totalStats.getIndexing() == null ? null : totalStats.getIndexing().getTotal().getMaxLastIndexRequestTimestamp();
table.addCell(
ts == null || ts == 0 ? null : STRICT_DATE_TIME_FORMATTER.format(Instant.ofEpochMilli(ts).atZone(ZoneOffset.UTC))
);

table.endRow();
}

return table;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,64 @@ public void testToXContentForIndexingStats() throws IOException {
assertEquals(expected, xContentBuilder.toString());
}

/**
* Tests aggregation logic for maxLastIndexRequestTimestamp in IndexingStats.Stats.
* Uses reflection because the field is private and not settable via public API.
* This ensures that aggregation (add) always surfaces the maximum value, even across multiple adds and random values.
*/
public void testMaxLastIndexRequestTimestampAggregation() throws Exception {
// Use explicit values for all fields except the timestamp
IndexingStats.Stats.DocStatusStats docStatusStats = new IndexingStats.Stats.DocStatusStats();
long ts1 = randomLongBetween(0, 1000000);
long ts2 = randomLongBetween(0, 1000000);
long ts3 = randomLongBetween(0, 1000000);
IndexingStats.Stats stats1 = new IndexingStats.Stats(1, 2, 3, 4, 5, 6, 7, 8, false, 9, docStatusStats, ts1);
IndexingStats.Stats stats2 = new IndexingStats.Stats(1, 2, 3, 4, 5, 6, 7, 8, false, 9, docStatusStats, ts2);
IndexingStats.Stats stats3 = new IndexingStats.Stats(1, 2, 3, 4, 5, 6, 7, 8, false, 9, docStatusStats, ts3);

// Aggregate stats1 + stats2
stats1.add(stats2);
assertEquals(Math.max(ts1, ts2), stats1.getMaxLastIndexRequestTimestamp());

// Aggregate stats1 + stats3
stats1.add(stats3);
assertEquals(Math.max(Math.max(ts1, ts2), ts3), stats1.getMaxLastIndexRequestTimestamp());

// Test with zero and negative values
IndexingStats.Stats statsZero = new IndexingStats.Stats(1, 2, 3, 4, 5, 6, 7, 8, false, 9, docStatusStats, 0L);
IndexingStats.Stats statsNeg = new IndexingStats.Stats(1, 2, 3, 4, 5, 6, 7, 8, false, 9, docStatusStats, -100L);
statsZero.add(statsNeg);
assertEquals(0L, statsZero.getMaxLastIndexRequestTimestamp());

IndexingStats.Stats statsNeg2 = new IndexingStats.Stats(1, 2, 3, 4, 5, 6, 7, 8, false, 9, docStatusStats, -50L);
statsNeg.add(statsNeg2);
assertEquals(-50L, statsNeg.getMaxLastIndexRequestTimestamp());
}

public void testMaxLastIndexRequestTimestampBackwardCompatibility() throws IOException {
IndexingStats.Stats.DocStatusStats docStatusStats = new IndexingStats.Stats.DocStatusStats();
long ts = randomLongBetween(0, 1000000);
IndexingStats.Stats stats = new IndexingStats.Stats(1, 2, 3, 4, 5, 6, 7, 8, false, 9, docStatusStats, ts);

// Serialize with V_3_1_0 (should include the field)
BytesStreamOutput outNew = new BytesStreamOutput();
outNew.setVersion(org.opensearch.Version.V_3_1_0);
stats.writeTo(outNew);
StreamInput inNew = outNew.bytes().streamInput();
inNew.setVersion(org.opensearch.Version.V_3_1_0);
IndexingStats.Stats deserializedNew = new IndexingStats.Stats(inNew);
assertEquals(ts, deserializedNew.getMaxLastIndexRequestTimestamp());

// Serialize with V_2_11_0 (should NOT include the field, should default to 0)
BytesStreamOutput outOld = new BytesStreamOutput();
outOld.setVersion(org.opensearch.Version.V_2_11_0);
stats.writeTo(outOld);
StreamInput inOld = outOld.bytes().streamInput();
inOld.setVersion(org.opensearch.Version.V_2_11_0);
IndexingStats.Stats deserializedOld = new IndexingStats.Stats(inOld);
assertEquals(0L, deserializedOld.getMaxLastIndexRequestTimestamp());
}

private IndexingStats createTestInstance() {
IndexingStats.Stats.DocStatusStats docStatusStats = new IndexingStats.Stats.DocStatusStats();
for (int i = 1; i < 6; ++i) {
Expand All @@ -132,7 +190,8 @@ private IndexingStats createTestInstance() {
randomNonNegativeLong(),
randomBoolean(),
randomNonNegativeLong(),
docStatusStats
docStatusStats,
randomNonNegativeLong()
);

return new IndexingStats(stats);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,14 @@ private void assertTableHeaders(Table table) {
assertThat(headers.get(3).value, equalTo("uuid"));
assertThat(headers.get(4).value, equalTo("pri"));
assertThat(headers.get(5).value, equalTo("rep"));
// Check for new columns (at the end)
boolean foundRaw = false, foundString = false;
for (Table.Cell cell : headers) {
if ("last_index_request_timestamp".equals(cell.value)) foundRaw = true;
if ("last_index_request_timestamp_string".equals(cell.value)) foundString = true;
}
assertTrue(foundRaw);
assertTrue(foundString);
}

private void assertTableRows(Table table) {
Expand Down Expand Up @@ -241,4 +249,64 @@ private void assertTableRows(Table table) {
}
}
}

public void testLastIndexRequestTimestampColumns() {
final ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
final Settings settings = Settings.builder().build();
final ResponseLimitSettings responseLimitSettings = new ResponseLimitSettings(clusterSettings, settings);
final RestIndicesAction action = new RestIndicesAction(responseLimitSettings);
// Setup a known timestamp
long knownTs = 1710000000000L;
IndexStats indexStats = mock(IndexStats.class);
CommonStats commonStats = mock(CommonStats.class);
org.opensearch.index.shard.IndexingStats indexingStats = mock(org.opensearch.index.shard.IndexingStats.class);
org.opensearch.index.shard.IndexingStats.Stats stats = mock(org.opensearch.index.shard.IndexingStats.Stats.class);
when(indexStats.getTotal()).thenReturn(commonStats);
when(indexStats.getPrimaries()).thenReturn(commonStats);
when(commonStats.getIndexing()).thenReturn(indexingStats);
when(indexingStats.getTotal()).thenReturn(stats);
when(stats.getMaxLastIndexRequestTimestamp()).thenReturn(knownTs);
Map<String, IndexStats> testStats = new LinkedHashMap<>();
String testIndex = "test-index";
testStats.put(testIndex, indexStats);
Map<String, Settings> testSettings = new LinkedHashMap<>();
testSettings.put(testIndex, Settings.EMPTY);
Map<String, IndexMetadata> testMetadatas = new LinkedHashMap<>();
Settings indexSettings = Settings.builder().put(IndexMetadata.SETTING_VERSION_CREATED, org.opensearch.Version.CURRENT).build();
testMetadatas.put(
testIndex,
IndexMetadata.builder(testIndex).settings(indexSettings).numberOfShards(1).numberOfReplicas(0).build()
);
Map<String, ClusterIndexHealth> testHealths = new LinkedHashMap<>();
Table table = action.buildTable(
new FakeRestRequest(),
testSettings,
testHealths,
testStats,
testMetadatas,
action.getTableIterator(new String[] { testIndex }, testSettings),
null
);
// Find the columns
List<Table.Cell> header = table.getHeaders();
int rawIdx = -1, strIdx = -1;
for (int i = 0; i < header.size(); i++) {
if ("last_index_request_timestamp".equals(header.get(i).value)) rawIdx = i;
if ("last_index_request_timestamp_string".equals(header.get(i).value)) strIdx = i;
}
assertTrue(rawIdx != -1);
assertTrue(strIdx != -1);
List<List<Table.Cell>> rows = table.getRows();
assertEquals(1, rows.size());
List<Table.Cell> row = rows.get(0);
assertEquals(String.valueOf(knownTs), row.get(rawIdx).value.toString());
// Robust: parse the string as ISO-8601 and compare to knownTs
String timestampString = row.get(strIdx).value.toString();
try {
java.time.Instant parsed = java.time.Instant.parse(timestampString);
assertEquals(knownTs, parsed.toEpochMilli());
} catch (java.time.format.DateTimeParseException e) {
fail("Timestamp string is not a valid ISO-8601 date: " + timestampString);
}
}
}
Loading