Skip to content

Commit b3ad02a

Browse files
Add node-left metric (#18421)
* Add node-left metric Signed-off-by: Bhumika Sharma <[email protected]>
1 parent 5255f5b commit b3ad02a

File tree

13 files changed

+128
-18
lines changed

13 files changed

+128
-18
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
4646
- Add FIPS build tooling ([#4254](https://github.com/opensearch-project/security/issues/4254))
4747
- Support Nested Aggregations as part of Star-Tree ([#18048](https://github.com/opensearch-project/OpenSearch/pull/18048))
4848
- [Star-Tree] Support for date-range queries with star-tree supported aggregations ([#17855](https://github.com/opensearch-project/OpenSearch/pull/17855)
49+
- Added node-left metric to cluster manager ([#18421](https://github.com/opensearch-project/OpenSearch/pull/18421))
4950
- [Star tree] Remove star tree feature flag and add index setting to configure star tree search on index basis ([#18070](https://github.com/opensearch-project/OpenSearch/pull/18070))
5051
- Approximation Framework Enhancement: Update the BKD traversal logic to improve the performance on skewed data ([#18439](https://github.com/opensearch-project/OpenSearch/issues/18439))
5152

server/src/main/java/org/opensearch/cluster/ClusterManagerMetrics.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@
2323
*/
2424
public final class ClusterManagerMetrics {
2525

26+
public static final String FOLLOWER_NODE_ID_TAG = "follower_node_id";
27+
public static final String REASON_TAG = "reason";
2628
private static final String LATENCY_METRIC_UNIT_MS = "ms";
2729
private static final String COUNTER_METRICS_UNIT = "1";
2830

@@ -36,6 +38,7 @@ public final class ClusterManagerMetrics {
3638
public final Counter followerChecksFailureCounter;
3739
public final Counter asyncFetchFailureCounter;
3840
public final Counter asyncFetchSuccessCounter;
41+
public final Counter nodeLeftCounter;
3942

4043
public ClusterManagerMetrics(MetricsRegistry metricsRegistry) {
4144
clusterStateAppliersHistogram = metricsRegistry.createHistogram(
@@ -83,7 +86,7 @@ public ClusterManagerMetrics(MetricsRegistry metricsRegistry) {
8386
"Counter for number of successful async fetches",
8487
COUNTER_METRICS_UNIT
8588
);
86-
89+
nodeLeftCounter = metricsRegistry.createCounter("node.left.count", "Counter for node left operation", COUNTER_METRICS_UNIT);
8790
}
8891

8992
public void recordLatency(Histogram histogram, Double value) {

server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,7 @@
9090
import org.opensearch.monitor.NodeHealthService;
9191
import org.opensearch.monitor.StatusInfo;
9292
import org.opensearch.node.remotestore.RemoteStoreNodeService;
93+
import org.opensearch.telemetry.metrics.tags.Tags;
9394
import org.opensearch.threadpool.Scheduler;
9495
import org.opensearch.threadpool.ThreadPool.Names;
9596
import org.opensearch.transport.TransportService;
@@ -111,6 +112,12 @@
111112
import java.util.stream.Stream;
112113
import java.util.stream.StreamSupport;
113114

115+
import static org.opensearch.cluster.ClusterManagerMetrics.FOLLOWER_NODE_ID_TAG;
116+
import static org.opensearch.cluster.ClusterManagerMetrics.REASON_TAG;
117+
import static org.opensearch.cluster.coordination.FollowersChecker.NODE_LEFT_REASON_DISCONNECTED;
118+
import static org.opensearch.cluster.coordination.FollowersChecker.NODE_LEFT_REASON_FOLLOWER_CHECK_RETRY_FAIL;
119+
import static org.opensearch.cluster.coordination.FollowersChecker.NODE_LEFT_REASON_HEALTHCHECK_FAIL;
120+
import static org.opensearch.cluster.coordination.FollowersChecker.NODE_LEFT_REASON_LAGGING;
114121
import static org.opensearch.cluster.coordination.NoClusterManagerBlockService.NO_CLUSTER_MANAGER_BLOCK_ID;
115122
import static org.opensearch.cluster.decommission.DecommissionHelper.nodeCommissioned;
116123
import static org.opensearch.gateway.ClusterStateUpdaters.hideStateIfNotRecovered;
@@ -193,6 +200,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
193200
private final RemoteStoreNodeService remoteStoreNodeService;
194201
private NodeConnectionsService nodeConnectionsService;
195202
private final ClusterSettings clusterSettings;
203+
private final ClusterManagerMetrics clusterManagerMetrics;
196204

197205
/**
198206
* @param nodeName The name of the node, used to name the {@link java.util.concurrent.ExecutorService} of the {@link SeedHostsResolver}.
@@ -250,6 +258,7 @@ public Coordinator(
250258
this.publishTimeout = PUBLISH_TIMEOUT_SETTING.get(settings);
251259
clusterSettings.addSettingsUpdateConsumer(PUBLISH_TIMEOUT_SETTING, this::setPublishTimeout);
252260
this.publishInfoTimeout = PUBLISH_INFO_TIMEOUT_SETTING.get(settings);
261+
this.clusterManagerMetrics = clusterManagerMetrics;
253262
this.random = random;
254263
this.electionSchedulerFactory = new ElectionSchedulerFactory(settings, random, transportService.getThreadPool());
255264
this.preVoteCollector = new PreVoteCollector(
@@ -359,6 +368,20 @@ private void removeNode(DiscoveryNode discoveryNode, String reason) {
359368
nodeRemovalExecutor,
360369
nodeRemovalExecutor
361370
);
371+
String reasonToPublish = switch (reason) {
372+
case NODE_LEFT_REASON_DISCONNECTED -> "disconnected";
373+
case NODE_LEFT_REASON_LAGGING -> "lagging";
374+
case NODE_LEFT_REASON_FOLLOWER_CHECK_RETRY_FAIL -> "follower.check.fail";
375+
case NODE_LEFT_REASON_HEALTHCHECK_FAIL -> "health.check.fail";
376+
default -> reason;
377+
};
378+
clusterManagerMetrics.incrementCounter(
379+
clusterManagerMetrics.nodeLeftCounter,
380+
1.0,
381+
Optional.ofNullable(
382+
Tags.create().addTag(FOLLOWER_NODE_ID_TAG, discoveryNode.getId()).addTag(REASON_TAG, reasonToPublish)
383+
)
384+
);
362385
}
363386
}
364387
}

server/src/main/java/org/opensearch/cluster/coordination/FollowersChecker.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,10 @@ public class FollowersChecker {
8686
private static final Logger logger = LogManager.getLogger(FollowersChecker.class);
8787

8888
public static final String FOLLOWER_CHECK_ACTION_NAME = "internal:coordination/fault_detection/follower_check";
89+
public static final String NODE_LEFT_REASON_LAGGING = "lagging";
90+
public static final String NODE_LEFT_REASON_DISCONNECTED = "disconnected";
91+
public static final String NODE_LEFT_REASON_HEALTHCHECK_FAIL = "health check failed";
92+
public static final String NODE_LEFT_REASON_FOLLOWER_CHECK_RETRY_FAIL = "followers check retry count exceeded";
8993

9094
// the time between checks sent to each node
9195
public static final Setting<TimeValue> FOLLOWER_CHECK_INTERVAL_SETTING = Setting.timeSetting(
@@ -398,13 +402,13 @@ public void handleException(TransportException exp) {
398402
final String reason;
399403
if (exp instanceof ConnectTransportException || exp.getCause() instanceof ConnectTransportException) {
400404
logger.info(() -> new ParameterizedMessage("{} disconnected", FollowerChecker.this), exp);
401-
reason = "disconnected";
405+
reason = NODE_LEFT_REASON_DISCONNECTED;
402406
} else if (exp.getCause() instanceof NodeHealthCheckFailureException) {
403407
logger.info(() -> new ParameterizedMessage("{} health check failed", FollowerChecker.this), exp);
404-
reason = "health check failed";
408+
reason = NODE_LEFT_REASON_HEALTHCHECK_FAIL;
405409
} else if (failureCountSinceLastSuccess >= followerCheckRetryCount) {
406410
logger.info(() -> new ParameterizedMessage("{} failed too many times", FollowerChecker.this), exp);
407-
reason = "followers check retry count exceeded";
411+
reason = NODE_LEFT_REASON_FOLLOWER_CHECK_RETRY_FAIL;
408412
} else {
409413
logger.info(() -> new ParameterizedMessage("{} failed, retrying", FollowerChecker.this), exp);
410414
scheduleNextWakeUp();

server/src/test/java/org/opensearch/cluster/coordination/CoordinatorTests.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,13 +57,17 @@
5757
import org.opensearch.gateway.GatewayService;
5858
import org.opensearch.monitor.StatusInfo;
5959
import org.opensearch.test.MockLogAppender;
60+
import org.opensearch.test.telemetry.TestInMemoryCounter;
61+
import org.opensearch.test.telemetry.TestInMemoryMetricsRegistry;
6062

6163
import java.io.IOException;
6264
import java.util.Arrays;
65+
import java.util.HashMap;
6366
import java.util.HashSet;
6467
import java.util.List;
6568
import java.util.Map;
6669
import java.util.Set;
70+
import java.util.concurrent.ConcurrentHashMap;
6771
import java.util.concurrent.atomic.AtomicBoolean;
6872
import java.util.concurrent.atomic.AtomicReference;
6973
import java.util.function.Function;
@@ -250,6 +254,25 @@ public void testUnhealthyNodesGetsRemoved() {
250254
assertThat(lastCommittedConfiguration + " should be 3 nodes", lastCommittedConfiguration.getNodeIds().size(), equalTo(3));
251255
assertFalse(lastCommittedConfiguration.getNodeIds().contains(newNode1.getId()));
252256
assertFalse(lastCommittedConfiguration.getNodeIds().contains(newNode2.getId()));
257+
258+
TestInMemoryMetricsRegistry clusterManagerMetricsRegistry = leader.getMetricsRegistry();
259+
TestInMemoryCounter nodeLeftCounter = clusterManagerMetricsRegistry.getCounterStore().get("node.left.count");
260+
assertNotNull("node.left.count counter should be present", nodeLeftCounter);
261+
ConcurrentHashMap<Map<String, ?>, Double> counterValuesByTags = nodeLeftCounter.getCounterValueForTags();
262+
263+
// Check for newNode1
264+
Map<String, Object> tags1 = new HashMap<>();
265+
tags1.put("follower_node_id", newNode1.getId());
266+
tags1.put("reason", "health.check.fail");
267+
assertTrue(counterValuesByTags.containsKey(tags1));
268+
assertEquals(Double.valueOf(1.0), counterValuesByTags.get(tags1));
269+
270+
// Check for newNode2
271+
Map<String, Object> tags2 = new HashMap<>();
272+
tags2.put("follower_node_id", newNode2.getId());
273+
tags2.put("reason", "health.check.fail");
274+
assertTrue(counterValuesByTags.containsKey(tags2));
275+
assertEquals(Double.valueOf(1.0), counterValuesByTags.get(tags2));
253276
}
254277
}
255278
}

server/src/test/java/org/opensearch/cluster/coordination/FollowersCheckerTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,13 +48,13 @@
4848
import org.opensearch.core.transport.TransportResponse.Empty;
4949
import org.opensearch.monitor.NodeHealthService;
5050
import org.opensearch.monitor.StatusInfo;
51-
import org.opensearch.telemetry.TestInMemoryMetricsRegistry;
5251
import org.opensearch.telemetry.metrics.MetricsRegistry;
5352
import org.opensearch.telemetry.metrics.noop.NoopMetricsRegistry;
5453
import org.opensearch.telemetry.tracing.noop.NoopTracer;
5554
import org.opensearch.test.EqualsHashCodeTestUtils;
5655
import org.opensearch.test.EqualsHashCodeTestUtils.CopyFunction;
5756
import org.opensearch.test.OpenSearchTestCase;
57+
import org.opensearch.test.telemetry.TestInMemoryMetricsRegistry;
5858
import org.opensearch.test.transport.CapturingTransport;
5959
import org.opensearch.test.transport.MockTransport;
6060
import org.opensearch.threadpool.ThreadPool.Names;

server/src/test/java/org/opensearch/cluster/coordination/LeaderCheckerTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,11 +45,11 @@
4545
import org.opensearch.core.transport.TransportResponse;
4646
import org.opensearch.core.transport.TransportResponse.Empty;
4747
import org.opensearch.monitor.StatusInfo;
48-
import org.opensearch.telemetry.TestInMemoryMetricsRegistry;
4948
import org.opensearch.telemetry.tracing.noop.NoopTracer;
5049
import org.opensearch.test.EqualsHashCodeTestUtils;
5150
import org.opensearch.test.EqualsHashCodeTestUtils.CopyFunction;
5251
import org.opensearch.test.OpenSearchTestCase;
52+
import org.opensearch.test.telemetry.TestInMemoryMetricsRegistry;
5353
import org.opensearch.test.transport.CapturingTransport;
5454
import org.opensearch.test.transport.MockTransport;
5555
import org.opensearch.threadpool.ThreadPool.Names;

test/framework/build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ dependencies {
3939
api project(':libs:opensearch-nio')
4040
api project(":server")
4141
api project(":libs:opensearch-cli")
42+
api project(":libs:opensearch-telemetry")
4243
api project(":test:telemetry")
4344
api "com.carrotsearch.randomizedtesting:randomizedtesting-runner:${versions.randomizedrunner}"
4445
api "junit:junit:${versions.junit}"

test/framework/src/main/java/org/opensearch/cluster/coordination/AbstractCoordinatorTestCase.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,7 @@
9696
import org.opensearch.test.OpenSearchTestCase;
9797
import org.opensearch.test.disruption.DisruptableMockTransport;
9898
import org.opensearch.test.disruption.DisruptableMockTransport.ConnectionStatus;
99+
import org.opensearch.test.telemetry.TestInMemoryMetricsRegistry;
99100
import org.opensearch.threadpool.Scheduler;
100101
import org.opensearch.threadpool.ThreadPool;
101102
import org.opensearch.transport.TransportInterceptor;
@@ -1059,6 +1060,7 @@ class ClusterNode {
10591060
private RepositoriesService repositoriesService;
10601061
private RemoteStoreNodeService remoteStoreNodeService;
10611062
List<BiConsumer<DiscoveryNode, ClusterState>> extraJoinValidators = new ArrayList<>();
1063+
private TestInMemoryMetricsRegistry metricsRegistry = new TestInMemoryMetricsRegistry();
10621064

10631065
ClusterNode(int nodeIndex, boolean clusterManagerEligible, Settings nodeSettings, NodeHealthService nodeHealthService) {
10641066
this(
@@ -1188,7 +1190,7 @@ protected Optional<DisruptableMockTransport> getDisruptableMockTransport(Transpo
11881190
nodeHealthService,
11891191
persistedStateRegistry,
11901192
remoteStoreNodeService,
1191-
new ClusterManagerMetrics(NoopMetricsRegistry.INSTANCE),
1193+
new ClusterManagerMetrics(metricsRegistry),
11921194
null
11931195
);
11941196
coordinator.setNodeConnectionsService(nodeConnectionsService);
@@ -1211,6 +1213,10 @@ protected Optional<DisruptableMockTransport> getDisruptableMockTransport(Transpo
12111213
coordinator.startInitialJoin();
12121214
}
12131215

1216+
public TestInMemoryMetricsRegistry getMetricsRegistry() {
1217+
return metricsRegistry;
1218+
}
1219+
12141220
void close() {
12151221
assertThat("must add nodes to a cluster before closing them", clusterNodes, hasItem(ClusterNode.this));
12161222
onNode(() -> {

server/src/test/java/org/opensearch/telemetry/TestInMemoryCounter.java renamed to test/telemetry/src/main/java/org/opensearch/test/telemetry/TestInMemoryCounter.java

Lines changed: 22 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,12 @@
66
* compatible open source license.
77
*/
88

9-
package org.opensearch.telemetry;
9+
package org.opensearch.test.telemetry;
1010

1111
import org.opensearch.telemetry.metrics.Counter;
1212
import org.opensearch.telemetry.metrics.tags.Tags;
1313

14-
import java.util.HashMap;
14+
import java.util.Map;
1515
import java.util.concurrent.ConcurrentHashMap;
1616
import java.util.concurrent.atomic.AtomicInteger;
1717

@@ -24,13 +24,26 @@
2424
public class TestInMemoryCounter implements Counter {
2525

2626
private AtomicInteger counterValue = new AtomicInteger(0);
27-
private ConcurrentHashMap<HashMap<String, ?>, Double> counterValueForTags = new ConcurrentHashMap<>();
27+
private final ConcurrentHashMap<Map<String, ?>, Double> counterValueForTags = new ConcurrentHashMap<>();
2828

29+
/**
30+
* Constructor.
31+
*/
32+
public TestInMemoryCounter() {}
33+
34+
/**
35+
* returns the counter value.
36+
* @return
37+
*/
2938
public Integer getCounterValue() {
3039
return this.counterValue.get();
3140
}
3241

33-
public ConcurrentHashMap<HashMap<String, ?>, Double> getCounterValueForTags() {
42+
/**
43+
* returns the counter value tags
44+
* @return
45+
*/
46+
public ConcurrentHashMap<Map<String, ?>, Double> getCounterValueForTags() {
3447
return this.counterValueForTags;
3548
}
3649

@@ -41,12 +54,12 @@ public void add(double value) {
4154

4255
@Override
4356
public synchronized void add(double value, Tags tags) {
44-
HashMap<String, ?> hashMap = (HashMap<String, ?>) tags.getTagsMap();
45-
if (counterValueForTags.get(hashMap) == null) {
46-
counterValueForTags.put(hashMap, value);
57+
Map<String, ?> tagsMap = tags.getTagsMap();
58+
if (counterValueForTags.get(tagsMap) == null) {
59+
counterValueForTags.put(tagsMap, value);
4760
} else {
48-
value = counterValueForTags.get(hashMap) + value;
49-
counterValueForTags.put(hashMap, value);
61+
value = counterValueForTags.get(tagsMap) + value;
62+
counterValueForTags.put(tagsMap, value);
5063
}
5164
}
5265
}

server/src/test/java/org/opensearch/telemetry/TestInMemoryHistogram.java renamed to test/telemetry/src/main/java/org/opensearch/test/telemetry/TestInMemoryHistogram.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
* compatible open source license.
77
*/
88

9-
package org.opensearch.telemetry;
9+
package org.opensearch.test.telemetry;
1010

1111
import org.opensearch.telemetry.metrics.Histogram;
1212
import org.opensearch.telemetry.metrics.tags.Tags;
@@ -26,10 +26,23 @@ public class TestInMemoryHistogram implements Histogram {
2626
private AtomicInteger histogramValue = new AtomicInteger(0);
2727
private ConcurrentHashMap<HashMap<String, ?>, Double> histogramValueForTags = new ConcurrentHashMap<>();
2828

29+
/**
30+
* Constructor.
31+
*/
32+
public TestInMemoryHistogram() {}
33+
34+
/**
35+
* Returns the Histogram value.
36+
* @return
37+
*/
2938
public Integer getHistogramValue() {
3039
return this.histogramValue.get();
3140
}
3241

42+
/**
43+
* Returns the Histogram value for tags
44+
* @return
45+
*/
3346
public ConcurrentHashMap<HashMap<String, ?>, Double> getHistogramValueForTags() {
3447
return this.histogramValueForTags;
3548
}

server/src/test/java/org/opensearch/telemetry/TestInMemoryMetricsRegistry.java renamed to test/telemetry/src/main/java/org/opensearch/test/telemetry/TestInMemoryMetricsRegistry.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
* compatible open source license.
77
*/
88

9-
package org.opensearch.telemetry;
9+
package org.opensearch.test.telemetry;
1010

1111
import org.opensearch.telemetry.metrics.Counter;
1212
import org.opensearch.telemetry.metrics.Histogram;
@@ -29,10 +29,23 @@ public class TestInMemoryMetricsRegistry implements MetricsRegistry {
2929
private ConcurrentHashMap<String, TestInMemoryCounter> counterStore = new ConcurrentHashMap<>();
3030
private ConcurrentHashMap<String, TestInMemoryHistogram> histogramStore = new ConcurrentHashMap<>();
3131

32+
/**
33+
* Constructor.
34+
*/
35+
public TestInMemoryMetricsRegistry() {}
36+
37+
/**
38+
* Returns counterStore
39+
* @return
40+
*/
3241
public ConcurrentHashMap<String, TestInMemoryCounter> getCounterStore() {
3342
return this.counterStore;
3443
}
3544

45+
/**
46+
* Returns the histogramStore.
47+
* @return
48+
*/
3649
public ConcurrentHashMap<String, TestInMemoryHistogram> getHistogramStore() {
3750
return this.histogramStore;
3851
}
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
/** Base opensearch package. */
10+
package org.opensearch.test.telemetry;

0 commit comments

Comments
 (0)