Skip to content

Commit f93ddc5

Browse files
author
Ajay Kumar Movva
committed
Added Node Stats api changes for the Point in time
Signed-off-by: Ajay Kumar Movva <[email protected]>
1 parent 00db112 commit f93ddc5

File tree

7 files changed

+173
-6
lines changed

7 files changed

+173
-6
lines changed

server/src/main/java/org/opensearch/index/search/stats/SearchStats.java

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,10 @@ public static class Stats implements Writeable, ToXContentFragment {
7777
private long suggestTimeInMillis;
7878
private long suggestCurrent;
7979

80+
private long pitCount;
81+
private long pitTimeInMillis;
82+
private long pitCurrent;
83+
8084
private Stats() {
8185
// for internal use, initializes all counts to 0
8286
}
@@ -91,6 +95,9 @@ public Stats(
9195
long scrollCount,
9296
long scrollTimeInMillis,
9397
long scrollCurrent,
98+
long pitCount,
99+
long pitTimeInMillis,
100+
long pitCurrent,
94101
long suggestCount,
95102
long suggestTimeInMillis,
96103
long suggestCurrent
@@ -110,6 +117,10 @@ public Stats(
110117
this.suggestCount = suggestCount;
111118
this.suggestTimeInMillis = suggestTimeInMillis;
112119
this.suggestCurrent = suggestCurrent;
120+
121+
this.pitCount = pitCount;
122+
this.pitTimeInMillis = pitTimeInMillis;
123+
this.pitCurrent = pitCurrent;
113124
}
114125

115126
private Stats(StreamInput in) throws IOException {
@@ -128,6 +139,10 @@ private Stats(StreamInput in) throws IOException {
128139
suggestCount = in.readVLong();
129140
suggestTimeInMillis = in.readVLong();
130141
suggestCurrent = in.readVLong();
142+
143+
pitCount = in.readVLong();
144+
pitTimeInMillis = in.readVLong();
145+
pitCurrent = in.readVLong();
131146
}
132147

133148
public void add(Stats stats) {
@@ -146,6 +161,10 @@ public void add(Stats stats) {
146161
suggestCount += stats.suggestCount;
147162
suggestTimeInMillis += stats.suggestTimeInMillis;
148163
suggestCurrent += stats.suggestCurrent;
164+
165+
pitCount += stats.pitCount;
166+
pitTimeInMillis += stats.pitTimeInMillis;
167+
pitCurrent += stats.pitCurrent;
149168
}
150169

151170
public void addForClosingShard(Stats stats) {
@@ -162,6 +181,10 @@ public void addForClosingShard(Stats stats) {
162181

163182
suggestCount += stats.suggestCount;
164183
suggestTimeInMillis += stats.suggestTimeInMillis;
184+
185+
pitCount += stats.pitCount;
186+
pitTimeInMillis += stats.pitTimeInMillis;
187+
pitCurrent += stats.pitCurrent;
165188
}
166189

167190
public long getQueryCount() {
@@ -212,6 +235,22 @@ public long getScrollCurrent() {
212235
return scrollCurrent;
213236
}
214237

238+
public long getPitCount() {
239+
return pitCount;
240+
}
241+
242+
public TimeValue getPitTime() {
243+
return new TimeValue(pitTimeInMillis);
244+
}
245+
246+
public long getPitTimeInMillis() {
247+
return pitTimeInMillis;
248+
}
249+
250+
public long getPitCurrent() {
251+
return pitCurrent;
252+
}
253+
215254
public long getSuggestCount() {
216255
return suggestCount;
217256
}
@@ -249,6 +288,10 @@ public void writeTo(StreamOutput out) throws IOException {
249288
out.writeVLong(suggestCount);
250289
out.writeVLong(suggestTimeInMillis);
251290
out.writeVLong(suggestCurrent);
291+
292+
out.writeVLong(pitCount);
293+
out.writeVLong(pitTimeInMillis);
294+
out.writeVLong(pitCurrent);
252295
}
253296

254297
@Override
@@ -265,6 +308,10 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
265308
builder.humanReadableField(Fields.SCROLL_TIME_IN_MILLIS, Fields.SCROLL_TIME, getScrollTime());
266309
builder.field(Fields.SCROLL_CURRENT, scrollCurrent);
267310

311+
builder.field(Fields.PIT_TOTAL, pitCount);
312+
builder.humanReadableField(Fields.PIT_TIME_IN_MILLIS, Fields.PIT_TIME, getPitTime());
313+
builder.field(Fields.PIT_CURRENT, pitCurrent);
314+
268315
builder.field(Fields.SUGGEST_TOTAL, suggestCount);
269316
builder.humanReadableField(Fields.SUGGEST_TIME_IN_MILLIS, Fields.SUGGEST_TIME, getSuggestTime());
270317
builder.field(Fields.SUGGEST_CURRENT, suggestCurrent);
@@ -385,6 +432,10 @@ static final class Fields {
385432
static final String SCROLL_TIME = "scroll_time";
386433
static final String SCROLL_TIME_IN_MILLIS = "scroll_time_in_millis";
387434
static final String SCROLL_CURRENT = "scroll_current";
435+
static final String PIT_TOTAL = "pit_total";
436+
static final String PIT_TIME = "pit_time";
437+
static final String PIT_TIME_IN_MILLIS = "pit_time_in_millis";
438+
static final String PIT_CURRENT = "pit_current";
388439
static final String SUGGEST_TOTAL = "suggest_total";
389440
static final String SUGGEST_TIME = "suggest_time";
390441
static final String SUGGEST_TIME_IN_MILLIS = "suggest_time_in_millis";

server/src/main/java/org/opensearch/index/search/stats/ShardSearchStats.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -187,6 +187,18 @@ public void onFreeScrollContext(ReaderContext readerContext) {
187187
totalStats.scrollMetric.inc(TimeUnit.NANOSECONDS.toMicros(System.nanoTime() - readerContext.getStartTimeInNano()));
188188
}
189189

190+
@Override
191+
public void onNewPitContext(ReaderContext readerContext) {
192+
totalStats.pitCurrent.inc();
193+
}
194+
195+
@Override
196+
public void onFreePitContext(ReaderContext readerContext) {
197+
totalStats.pitCurrent.dec();
198+
assert totalStats.pitCurrent.count() >= 0;
199+
totalStats.pitMetric.inc(TimeUnit.NANOSECONDS.toMicros(System.nanoTime() - readerContext.getStartTimeInNano()));
200+
}
201+
190202
/**
191203
* Holder of statistics values
192204
*
@@ -203,10 +215,12 @@ static final class StatsHolder {
203215
* for one-thousand times as long (i.e., scrolls that execute for almost twelve days on average).
204216
*/
205217
final MeanMetric scrollMetric = new MeanMetric();
218+
final MeanMetric pitMetric = new MeanMetric();
206219
final MeanMetric suggestMetric = new MeanMetric();
207220
final CounterMetric queryCurrent = new CounterMetric();
208221
final CounterMetric fetchCurrent = new CounterMetric();
209222
final CounterMetric scrollCurrent = new CounterMetric();
223+
final CounterMetric pitCurrent = new CounterMetric();
210224
final CounterMetric suggestCurrent = new CounterMetric();
211225

212226
SearchStats.Stats stats() {
@@ -220,6 +234,9 @@ SearchStats.Stats stats() {
220234
scrollMetric.count(),
221235
TimeUnit.MICROSECONDS.toMillis(scrollMetric.sum()),
222236
scrollCurrent.count(),
237+
pitMetric.count(),
238+
TimeUnit.MICROSECONDS.toMillis(pitMetric.sum()),
239+
pitCurrent.count(),
223240
suggestMetric.count(),
224241
TimeUnit.NANOSECONDS.toMillis(suggestMetric.sum()),
225242
suggestCurrent.count()

server/src/test/java/org/opensearch/index/search/stats/SearchStatsTests.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -45,9 +45,9 @@ public void testShardLevelSearchGroupStats() throws Exception {
4545
// let's create two dummy search stats with groups
4646
Map<String, Stats> groupStats1 = new HashMap<>();
4747
Map<String, Stats> groupStats2 = new HashMap<>();
48-
groupStats2.put("group1", new Stats(1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1));
49-
SearchStats searchStats1 = new SearchStats(new Stats(1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1), 0, groupStats1);
50-
SearchStats searchStats2 = new SearchStats(new Stats(1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1), 0, groupStats2);
48+
groupStats2.put("group1", new Stats(1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1));
49+
SearchStats searchStats1 = new SearchStats(new Stats(1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1), 0, groupStats1);
50+
SearchStats searchStats2 = new SearchStats(new Stats(1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1), 0, groupStats2);
5151

5252
// adding these two search stats and checking group stats are correct
5353
searchStats1.add(searchStats2);
@@ -75,6 +75,9 @@ private static void assertStats(Stats stats, long equalTo) {
7575
assertEquals(equalTo, stats.getScrollCount());
7676
assertEquals(equalTo, stats.getScrollTimeInMillis());
7777
assertEquals(equalTo, stats.getScrollCurrent());
78+
assertEquals(equalTo, stats.getPitCount());
79+
assertEquals(equalTo, stats.getPitTimeInMillis());
80+
assertEquals(equalTo, stats.getPitCurrent());
7881
assertEquals(equalTo, stats.getSuggestCount());
7982
assertEquals(equalTo, stats.getSuggestTimeInMillis());
8083
assertEquals(equalTo, stats.getSuggestCurrent());

server/src/test/java/org/opensearch/search/CreatePitMultiNodeTests.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@
2727
import org.opensearch.test.OpenSearchIntegTestCase;
2828
import org.opensearch.threadpool.TestThreadPool;
2929
import org.opensearch.threadpool.ThreadPool;
30+
import org.opensearch.action.admin.indices.stats.IndicesStatsRequest;
31+
import org.opensearch.action.admin.indices.stats.IndicesStatsResponse;
3032

3133
import java.util.ArrayList;
3234
import java.util.HashSet;
@@ -70,6 +72,7 @@ public void testPit() throws Exception {
7072
.get();
7173
assertEquals(2, searchResponse.getSuccessfulShards());
7274
assertEquals(2, searchResponse.getTotalShards());
75+
validatePitStats("index", 2, 2);
7376
}
7477

7578
public void testCreatePitWhileNodeDropWithAllowPartialCreationFalse() throws Exception {
@@ -82,6 +85,7 @@ public Settings onNodeStopped(String nodeName) throws Exception {
8285
ExecutionException ex = expectThrows(ExecutionException.class, execute::get);
8386
assertTrue(ex.getMessage().contains("Failed to execute phase [create_pit]"));
8487
assertTrue(ex.getMessage().contains("Partial shards failure"));
88+
validatePitStats("index", 0, 0);
8589
return super.onNodeStopped(nodeName);
8690
}
8791
});
@@ -103,6 +107,7 @@ public Settings onNodeStopped(String nodeName) throws Exception {
103107
.get();
104108
assertEquals(1, searchResponse.getSuccessfulShards());
105109
assertEquals(1, searchResponse.getTotalShards());
110+
validatePitStats("index", 1, 1);
106111
return super.onNodeStopped(nodeName);
107112
}
108113
});
@@ -124,6 +129,7 @@ public Settings onNodeStopped(String nodeName) throws Exception {
124129
assertEquals(1, searchResponse.getFailedShards());
125130
assertEquals(0, searchResponse.getSkippedShards());
126131
assertEquals(2, searchResponse.getTotalShards());
132+
validatePitStats("index", 1, 1);
127133
return super.onNodeStopped(nodeName);
128134
}
129135
});
@@ -312,4 +318,16 @@ public void onFailure(Exception e) {}
312318
ThreadPool.terminate(testThreadPool, 500, TimeUnit.MILLISECONDS);
313319
}
314320
}
321+
322+
public void validatePitStats(String index, long expectedPitCurrent, long expectedOpenContexts) throws ExecutionException,
323+
InterruptedException {
324+
IndicesStatsRequest indicesStatsRequest = new IndicesStatsRequest();
325+
indicesStatsRequest.indices("index");
326+
indicesStatsRequest.all();
327+
IndicesStatsResponse indicesStatsResponse = client().admin().indices().stats(indicesStatsRequest).get();
328+
long pitCurrent = indicesStatsResponse.getIndex(index).getTotal().search.getTotal().getPitCurrent();
329+
long openContexts = indicesStatsResponse.getIndex(index).getTotal().search.getOpenContexts();
330+
assertEquals(expectedPitCurrent, pitCurrent);
331+
assertEquals(expectedOpenContexts, openContexts);
332+
}
315333
}

server/src/test/java/org/opensearch/search/CreatePitSingleNodeTests.java

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,9 @@
2323
import org.opensearch.search.builder.PointInTimeBuilder;
2424
import org.opensearch.search.sort.SortOrder;
2525
import org.opensearch.test.OpenSearchSingleNodeTestCase;
26+
import org.opensearch.index.IndexService;
27+
import org.opensearch.index.shard.IndexShard;
28+
import org.opensearch.indices.IndicesService;
2629

2730
import java.util.Map;
2831
import java.util.concurrent.CountDownLatch;
@@ -72,7 +75,11 @@ public void testCreatePITSuccess() throws ExecutionException, InterruptedExcepti
7275

7376
SearchService service = getInstanceFromNode(SearchService.class);
7477
assertEquals(2, service.getActiveContexts());
78+
validatePitStats("index", 1, 0, 0);
79+
validatePitStats("index", 1, 0, 1);
7580
service.doClose(); // this kills the keep-alive reaper we have to reset the node after this test
81+
validatePitStats("index", 0, 1, 0);
82+
validatePitStats("index", 0, 1, 1);
7683
}
7784

7885
public void testCreatePITWithMultipleIndicesSuccess() throws ExecutionException, InterruptedException {
@@ -88,7 +95,12 @@ public void testCreatePITWithMultipleIndicesSuccess() throws ExecutionException,
8895
CreatePitResponse response = execute.get();
8996
assertEquals(4, response.getSuccessfulShards());
9097
assertEquals(4, service.getActiveContexts());
98+
99+
validatePitStats("index", 1, 0, 0);
100+
validatePitStats("index1", 1, 0, 0);
91101
service.doClose();
102+
validatePitStats("index", 0, 1, 0);
103+
validatePitStats("index1", 0, 1, 0);
92104
}
93105

94106
public void testCreatePITWithShardReplicasSuccess() throws ExecutionException, InterruptedException {
@@ -109,7 +121,11 @@ public void testCreatePITWithShardReplicasSuccess() throws ExecutionException, I
109121

110122
SearchService service = getInstanceFromNode(SearchService.class);
111123
assertEquals(2, service.getActiveContexts());
124+
validatePitStats("index", 1, 0, 0);
125+
validatePitStats("index", 1, 0, 1);
112126
service.doClose();
127+
validatePitStats("index", 0, 1, 0);
128+
validatePitStats("index", 0, 1, 1);
113129
}
114130

115131
public void testCreatePITWithNonExistentIndex() {
@@ -192,6 +208,9 @@ public void testPitSearchOnCloseIndex() throws ExecutionException, InterruptedEx
192208
CreatePitResponse pitResponse = execute.get();
193209
SearchService service = getInstanceFromNode(SearchService.class);
194210
assertEquals(2, service.getActiveContexts());
211+
validatePitStats("index", 1, 0, 0);
212+
validatePitStats("index", 1, 0, 1);
213+
195214
client().admin().indices().prepareClose("index").get();
196215
SearchPhaseExecutionException ex = expectThrows(SearchPhaseExecutionException.class, () -> {
197216
SearchResponse searchResponse = client().prepareSearch()
@@ -239,7 +258,10 @@ public void testMaxOpenPitContexts() throws Exception {
239258
+ "This limit can be set by changing the [search.max_open_pit_context] setting."
240259
)
241260
);
261+
final int maxPitContexts = SearchService.MAX_OPEN_PIT_CONTEXT.get(Settings.EMPTY);
262+
validatePitStats("index", maxPitContexts, 0, 0);
242263
service.doClose();
264+
validatePitStats("index", 0, maxPitContexts, 0);
243265
}
244266

245267
public void testOpenPitContextsConcurrently() throws Exception {
@@ -285,7 +307,9 @@ public void testOpenPitContextsConcurrently() throws Exception {
285307
thread.join();
286308
}
287309
assertThat(service.getActiveContexts(), equalTo(maxPitContexts));
310+
validatePitStats("index", maxPitContexts, 0, 0);
288311
service.doClose();
312+
validatePitStats("index", 0, maxPitContexts, 0);
289313
}
290314

291315
/**
@@ -453,9 +477,11 @@ public void testPitAfterUpdateIndex() throws Exception {
453477
.getTotalHits().value,
454478
Matchers.equalTo(0L)
455479
);
480+
validatePitStats("test", 1, 0, 0);
456481
} finally {
457482
service.doClose();
458483
assertEquals(0, service.getActiveContexts());
484+
validatePitStats("test", 0, 1, 0);
459485
}
460486
}
461487

@@ -495,7 +521,20 @@ public void testConcurrentSearches() throws Exception {
495521

496522
SearchService service = getInstanceFromNode(SearchService.class);
497523
assertEquals(2, service.getActiveContexts());
524+
validatePitStats("index", 1, 0, 0);
525+
validatePitStats("index", 1, 0, 1);
498526
service.doClose();
499527
assertEquals(0, service.getActiveContexts());
528+
validatePitStats("index", 0, 1, 0);
529+
validatePitStats("index", 0, 1, 1);
530+
}
531+
532+
public void validatePitStats(String index, long expectedPitCurrent, long expectedPitCount, int shardId) throws ExecutionException,
533+
InterruptedException {
534+
IndicesService indicesService = getInstanceFromNode(IndicesService.class);
535+
IndexService indexService = indicesService.indexServiceSafe(resolveIndex(index));
536+
IndexShard indexShard = indexService.getShard(shardId);
537+
assertEquals(expectedPitCurrent, indexShard.searchStats().getTotal().getPitCurrent());
538+
assertEquals(expectedPitCount, indexShard.searchStats().getTotal().getPitCount());
500539
}
501540
}

0 commit comments

Comments
 (0)