Skip to content

Commit 9a1ed6f

Browse files
imRishNVishalks
authored andcommitted
Add DecommissionService and helper to execute awareness attribute decommissioning (opensearch-project#4084)
* Add Executor to decommission node attribute * Decommission service implementation with cluster metadata * Master abdication changes to decommission local awareness leader * Update join validator changes to validate decommissioned node join request Signed-off-by: Rishab Nahata <[email protected]>
1 parent a9a4d51 commit 9a1ed6f

20 files changed

+2226
-3
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,8 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)
4545
- [CCR] Add getHistoryOperationsFromTranslog method to fetch the history snapshot from translogs ([#3948](https://github.com/opensearch-project/OpenSearch/pull/3948))
4646
- [Remote Store] Change behaviour in replica recovery for remote translog enabled indices ([#4318](https://github.com/opensearch-project/OpenSearch/pull/4318))
4747
- Unmute test RelocationIT.testRelocationWhileIndexingRandom ([#4580](https://github.com/opensearch-project/OpenSearch/pull/4580))
48+
- Add DecommissionService and helper to execute awareness attribute decommissioning ([#4084](https://github.com/opensearch-project/OpenSearch/pull/4084))
49+
4850

4951
### Deprecated
5052

server/src/main/java/org/opensearch/OpenSearchException.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1608,6 +1608,18 @@ private enum OpenSearchExceptionHandle {
16081608
org.opensearch.index.shard.PrimaryShardClosedException::new,
16091609
162,
16101610
V_3_0_0
1611+
),
1612+
DECOMMISSIONING_FAILED_EXCEPTION(
1613+
org.opensearch.cluster.decommission.DecommissioningFailedException.class,
1614+
org.opensearch.cluster.decommission.DecommissioningFailedException::new,
1615+
163,
1616+
V_3_0_0
1617+
),
1618+
NODE_DECOMMISSIONED_EXCEPTION(
1619+
org.opensearch.cluster.decommission.NodeDecommissionedException.class,
1620+
org.opensearch.cluster.decommission.NodeDecommissionedException::new,
1621+
164,
1622+
V_3_0_0
16111623
);
16121624

16131625
final Class<? extends OpenSearchException> exceptionClass;

server/src/main/java/org/opensearch/cluster/ClusterModule.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import org.opensearch.cluster.action.index.MappingUpdatedAction;
3636
import org.opensearch.cluster.action.index.NodeMappingRefreshAction;
3737
import org.opensearch.cluster.action.shard.ShardStateAction;
38+
import org.opensearch.cluster.decommission.DecommissionAttributeMetadata;
3839
import org.opensearch.cluster.metadata.ComponentTemplateMetadata;
3940
import org.opensearch.cluster.metadata.ComposableIndexTemplateMetadata;
4041
import org.opensearch.cluster.metadata.DataStreamMetadata;
@@ -193,6 +194,12 @@ public static List<Entry> getNamedWriteables() {
193194
);
194195
registerMetadataCustom(entries, DataStreamMetadata.TYPE, DataStreamMetadata::new, DataStreamMetadata::readDiffFrom);
195196
registerMetadataCustom(entries, WeightedRoutingMetadata.TYPE, WeightedRoutingMetadata::new, WeightedRoutingMetadata::readDiffFrom);
197+
registerMetadataCustom(
198+
entries,
199+
DecommissionAttributeMetadata.TYPE,
200+
DecommissionAttributeMetadata::new,
201+
DecommissionAttributeMetadata::readDiffFrom
202+
);
196203
// Task Status (not Diffable)
197204
entries.add(new Entry(Task.Status.class, PersistentTasksNodeService.Status.NAME, PersistentTasksNodeService.Status::new));
198205
return entries;
@@ -283,6 +290,13 @@ public static List<NamedXContentRegistry.Entry> getNamedXWriteables() {
283290
WeightedRoutingMetadata::fromXContent
284291
)
285292
);
293+
entries.add(
294+
new NamedXContentRegistry.Entry(
295+
Metadata.Custom.class,
296+
new ParseField(DecommissionAttributeMetadata.TYPE),
297+
DecommissionAttributeMetadata::fromXContent
298+
)
299+
);
286300
return entries;
287301
}
288302

server/src/main/java/org/opensearch/cluster/coordination/JoinTaskExecutor.java

Lines changed: 34 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,10 @@
3939
import org.opensearch.cluster.ClusterStateTaskExecutor;
4040
import org.opensearch.cluster.NotClusterManagerException;
4141
import org.opensearch.cluster.block.ClusterBlocks;
42+
import org.opensearch.cluster.decommission.DecommissionAttribute;
43+
import org.opensearch.cluster.decommission.DecommissionAttributeMetadata;
44+
import org.opensearch.cluster.decommission.DecommissionStatus;
45+
import org.opensearch.cluster.decommission.NodeDecommissionedException;
4246
import org.opensearch.cluster.metadata.IndexMetadata;
4347
import org.opensearch.cluster.metadata.Metadata;
4448
import org.opensearch.cluster.node.DiscoveryNode;
@@ -358,6 +362,7 @@ public boolean runOnlyOnClusterManager() {
358362

359363
/**
360364
* a task indicates that the current node should become master
365+
*
361366
* @deprecated As of 2.0, because supporting inclusive language, replaced by {@link #newBecomeClusterManagerTask()}
362367
*/
363368
@Deprecated
@@ -384,8 +389,9 @@ public static Task newFinishElectionTask() {
384389
* Ensures that all indices are compatible with the given node version. This will ensure that all indices in the given metadata
385390
* will not be created with a newer version of opensearch as well as that all indices are newer or equal to the minimum index
386391
* compatibility version.
387-
* @see Version#minimumIndexCompatibilityVersion()
392+
*
388393
* @throws IllegalStateException if any index is incompatible with the given version
394+
* @see Version#minimumIndexCompatibilityVersion()
389395
*/
390396
public static void ensureIndexCompatibility(final Version nodeVersion, Metadata metadata) {
391397
Version supportedIndexVersion = nodeVersion.minimumIndexCompatibilityVersion();
@@ -415,14 +421,18 @@ public static void ensureIndexCompatibility(final Version nodeVersion, Metadata
415421
}
416422
}
417423

418-
/** ensures that the joining node has a version that's compatible with all current nodes*/
424+
/**
425+
* ensures that the joining node has a version that's compatible with all current nodes
426+
*/
419427
public static void ensureNodesCompatibility(final Version joiningNodeVersion, DiscoveryNodes currentNodes) {
420428
final Version minNodeVersion = currentNodes.getMinNodeVersion();
421429
final Version maxNodeVersion = currentNodes.getMaxNodeVersion();
422430
ensureNodesCompatibility(joiningNodeVersion, minNodeVersion, maxNodeVersion);
423431
}
424432

425-
/** ensures that the joining node has a version that's compatible with a given version range */
433+
/**
434+
* ensures that the joining node has a version that's compatible with a given version range
435+
*/
426436
public static void ensureNodesCompatibility(Version joiningNodeVersion, Version minClusterNodeVersion, Version maxClusterNodeVersion) {
427437
assert minClusterNodeVersion.onOrBefore(maxClusterNodeVersion) : minClusterNodeVersion + " > " + maxClusterNodeVersion;
428438
if (joiningNodeVersion.isCompatible(maxClusterNodeVersion) == false) {
@@ -466,13 +476,34 @@ public static void ensureMajorVersionBarrier(Version joiningNodeVersion, Version
466476
}
467477
}
468478

479+
public static void ensureNodeCommissioned(DiscoveryNode node, Metadata metadata) {
480+
DecommissionAttributeMetadata decommissionAttributeMetadata = metadata.decommissionAttributeMetadata();
481+
if (decommissionAttributeMetadata != null) {
482+
DecommissionAttribute decommissionAttribute = decommissionAttributeMetadata.decommissionAttribute();
483+
DecommissionStatus status = decommissionAttributeMetadata.status();
484+
if (decommissionAttribute != null && status != null) {
485+
// We will let the node join the cluster if the current status is in FAILED state
486+
if (node.getAttributes().get(decommissionAttribute.attributeName()).equals(decommissionAttribute.attributeValue())
487+
&& status.equals(DecommissionStatus.FAILED) == false) {
488+
throw new NodeDecommissionedException(
489+
"node [{}] has decommissioned attribute [{}] with current status of decommissioning [{}]",
490+
node.toString(),
491+
decommissionAttribute.toString(),
492+
status.status()
493+
);
494+
}
495+
}
496+
}
497+
}
498+
469499
public static Collection<BiConsumer<DiscoveryNode, ClusterState>> addBuiltInJoinValidators(
470500
Collection<BiConsumer<DiscoveryNode, ClusterState>> onJoinValidators
471501
) {
472502
final Collection<BiConsumer<DiscoveryNode, ClusterState>> validators = new ArrayList<>();
473503
validators.add((node, state) -> {
474504
ensureNodesCompatibility(node.getVersion(), state.getNodes());
475505
ensureIndexCompatibility(node.getVersion(), state.getMetadata());
506+
ensureNodeCommissioned(node, state.getMetadata());
476507
});
477508
validators.addAll(onJoinValidators);
478509
return Collections.unmodifiableCollection(validators);
Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.cluster.decommission;
10+
11+
import org.opensearch.common.io.stream.StreamInput;
12+
import org.opensearch.common.io.stream.StreamOutput;
13+
import org.opensearch.common.io.stream.Writeable;
14+
15+
import java.io.IOException;
16+
import java.util.Objects;
17+
18+
/**
19+
* {@link DecommissionAttribute} encapsulates information about decommissioned node attribute like attribute name, attribute value.
20+
*
21+
* @opensearch.internal
22+
*/
23+
public final class DecommissionAttribute implements Writeable {
24+
private final String attributeName;
25+
private final String attributeValue;
26+
27+
/**
28+
* Constructs new decommission attribute name value pair
29+
*
30+
* @param attributeName attribute name
31+
* @param attributeValue attribute value
32+
*/
33+
public DecommissionAttribute(String attributeName, String attributeValue) {
34+
this.attributeName = attributeName;
35+
this.attributeValue = attributeValue;
36+
}
37+
38+
/**
39+
* Returns attribute name
40+
*
41+
* @return attributeName
42+
*/
43+
public String attributeName() {
44+
return this.attributeName;
45+
}
46+
47+
/**
48+
* Returns attribute value
49+
*
50+
* @return attributeValue
51+
*/
52+
public String attributeValue() {
53+
return this.attributeValue;
54+
}
55+
56+
public DecommissionAttribute(StreamInput in) throws IOException {
57+
attributeName = in.readString();
58+
attributeValue = in.readString();
59+
}
60+
61+
/**
62+
* Writes decommission attribute name value to stream output
63+
*
64+
* @param out stream output
65+
*/
66+
@Override
67+
public void writeTo(StreamOutput out) throws IOException {
68+
out.writeString(attributeName);
69+
out.writeString(attributeValue);
70+
}
71+
72+
@Override
73+
public boolean equals(Object o) {
74+
if (this == o) return true;
75+
if (o == null || getClass() != o.getClass()) return false;
76+
77+
DecommissionAttribute that = (DecommissionAttribute) o;
78+
79+
if (!attributeName.equals(that.attributeName)) return false;
80+
return attributeValue.equals(that.attributeValue);
81+
}
82+
83+
@Override
84+
public int hashCode() {
85+
return Objects.hash(attributeName, attributeValue);
86+
}
87+
88+
@Override
89+
public String toString() {
90+
return "DecommissionAttribute{" + "attributeName='" + attributeName + '\'' + ", attributeValue='" + attributeValue + '\'' + '}';
91+
}
92+
}

0 commit comments

Comments
 (0)