Skip to content

Commit 04db50a

Browse files
authored
Support AutoExpand for SearchReplica (#17741)
1 parent 137683e commit 04db50a

File tree

15 files changed

+632
-39
lines changed

15 files changed

+632
-39
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
1616
- [Security Manager Replacement] Create initial Java Agent to intercept Socket::connect calls ([#17724](https://github.com/opensearch-project/OpenSearch/pull/17724))
1717
- Add ingestion management APIs for pause, resume and get ingestion state ([#17631](https://github.com/opensearch-project/OpenSearch/pull/17631))
1818
- [Security Manager Replacement] Enhance Java Agent to intercept System::exit ([#17746](https://github.com/opensearch-project/OpenSearch/pull/17746))
19+
- Support AutoExpand for SearchReplica ([#17741](https://github.com/opensearch-project/OpenSearch/pull/17741))
1920

2021
### Changed
2122
- Migrate BC libs to their FIPS counterparts ([#14912](https://github.com/opensearch-project/OpenSearch/pull/14912))
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.cluster.metadata;
10+
11+
import org.opensearch.cluster.routing.UnassignedInfo;
12+
import org.opensearch.common.settings.Settings;
13+
import org.opensearch.common.unit.TimeValue;
14+
import org.opensearch.common.util.FeatureFlags;
15+
import org.opensearch.indices.replication.common.ReplicationType;
16+
import org.opensearch.remotestore.RemoteStoreBaseIntegTestCase;
17+
import org.opensearch.test.InternalTestCluster;
18+
import org.opensearch.test.OpenSearchIntegTestCase;
19+
20+
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REPLICATION_TYPE;
21+
22+
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
23+
public class AutoExpandSearchReplicasIT extends RemoteStoreBaseIntegTestCase {
24+
25+
@Override
26+
protected Settings featureFlagSettings() {
27+
return Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.READER_WRITER_SPLIT_EXPERIMENTAL, Boolean.TRUE).build();
28+
}
29+
30+
public void testAutoExpandSearchReplica() throws Exception {
31+
String indexName = "test";
32+
internalCluster().startClusterManagerOnlyNode();
33+
34+
// Create a cluster with 2 data nodes and 1 search node
35+
internalCluster().startDataOnlyNode();
36+
internalCluster().startDataOnlyNode();
37+
String searchNode = internalCluster().startSearchOnlyNode();
38+
39+
// Create index with 1 primary, 1 replica and 1 search replica shards
40+
createIndex(
41+
indexName,
42+
Settings.builder()
43+
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
44+
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)
45+
.put(IndexMetadata.SETTING_NUMBER_OF_SEARCH_REPLICAS, 1)
46+
.put(SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
47+
.put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), TimeValue.timeValueMillis(0))
48+
.build()
49+
);
50+
ensureGreen();
51+
52+
assertBusy(() -> assertEquals(1, getNumShards(indexName).numSearchReplicas));
53+
54+
// Enable auto expand for search replica
55+
client().admin()
56+
.indices()
57+
.prepareUpdateSettings(indexName)
58+
.setSettings(Settings.builder().put("index.auto_expand_search_replicas", "0-all"))
59+
.get();
60+
61+
// Add 1 more search nodes
62+
internalCluster().startSearchOnlyNode();
63+
64+
assertBusy(() -> assertEquals(2, getNumShards(indexName).numSearchReplicas));
65+
66+
// Stop a node which hosts search replica
67+
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(searchNode));
68+
assertBusy(() -> assertEquals(1, getNumShards(indexName).numSearchReplicas));
69+
70+
// Add 1 more search nodes
71+
internalCluster().startSearchOnlyNode();
72+
assertBusy(() -> assertEquals(2, getNumShards(indexName).numSearchReplicas));
73+
}
74+
}

server/src/main/java/org/opensearch/cluster/metadata/AutoExpandReplicas.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@
3131

3232
package org.opensearch.cluster.metadata;
3333

34-
import org.opensearch.cluster.node.DiscoveryNode;
3534
import org.opensearch.cluster.routing.allocation.RoutingAllocation;
3635
import org.opensearch.cluster.routing.allocation.decider.Decision;
3736
import org.opensearch.common.Booleans;
@@ -142,13 +141,14 @@ public boolean isEnabled() {
142141

143142
private OptionalInt getDesiredNumberOfReplicas(IndexMetadata indexMetadata, RoutingAllocation allocation) {
144143
if (enabled) {
145-
int numMatchingDataNodes = 0;
146-
for (final DiscoveryNode cursor : allocation.nodes().getDataNodes().values()) {
147-
Decision decision = allocation.deciders().shouldAutoExpandToNode(indexMetadata, cursor, allocation);
148-
if (decision.type() != Decision.Type.NO) {
149-
numMatchingDataNodes++;
150-
}
151-
}
144+
int numMatchingDataNodes = (int) allocation.nodes()
145+
.getDataNodes()
146+
.values()
147+
.stream()
148+
.filter(node -> node.isSearchNode() == false)
149+
.map(node -> allocation.deciders().shouldAutoExpandToNode(indexMetadata, node, allocation))
150+
.filter(decision -> decision.type() != Decision.Type.NO)
151+
.count();
152152

153153
final int min = getMinReplicas();
154154
final int max = getMaxReplicas(numMatchingDataNodes);
Lines changed: 180 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,180 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.cluster.metadata;
10+
11+
import org.opensearch.cluster.node.DiscoveryNode;
12+
import org.opensearch.cluster.routing.allocation.RoutingAllocation;
13+
import org.opensearch.cluster.routing.allocation.decider.Decision;
14+
import org.opensearch.common.Booleans;
15+
import org.opensearch.common.settings.Setting;
16+
import org.opensearch.common.settings.Setting.Property;
17+
18+
import java.util.ArrayList;
19+
import java.util.HashMap;
20+
import java.util.List;
21+
import java.util.Map;
22+
import java.util.OptionalInt;
23+
24+
import static org.opensearch.cluster.metadata.MetadataIndexStateService.isIndexVerifiedBeforeClosed;
25+
26+
/**
27+
* This class acts as a functional wrapper around the {@code index.auto_expand_search_replicas} setting.
28+
* This setting's value expands into a minimum and maximum value, requiring special handling based on the
29+
* number of search nodes in the cluster. This class handles parsing and simplifies access to these values.
30+
*
31+
* @opensearch.internal
32+
*/
33+
public final class AutoExpandSearchReplicas {
34+
// the value we recognize in the "max" position to mean all the search nodes
35+
private static final String ALL_NODES_VALUE = "all";
36+
37+
private static final AutoExpandSearchReplicas FALSE_INSTANCE = new AutoExpandSearchReplicas(0, 0, false);
38+
39+
public static final Setting<AutoExpandSearchReplicas> SETTING = new Setting<>(
40+
IndexMetadata.SETTING_AUTO_EXPAND_SEARCH_REPLICAS,
41+
"false",
42+
AutoExpandSearchReplicas::parse,
43+
Property.Dynamic,
44+
Property.IndexScope
45+
);
46+
47+
private static AutoExpandSearchReplicas parse(String value) {
48+
final int min;
49+
final int max;
50+
if (Booleans.isFalse(value)) {
51+
return FALSE_INSTANCE;
52+
}
53+
final int dash = value.indexOf('-');
54+
if (-1 == dash) {
55+
throw new IllegalArgumentException(
56+
"failed to parse [" + IndexMetadata.SETTING_AUTO_EXPAND_SEARCH_REPLICAS + "] from value: [" + value + "] at index " + dash
57+
);
58+
}
59+
final String sMin = value.substring(0, dash);
60+
try {
61+
min = Integer.parseInt(sMin);
62+
} catch (NumberFormatException e) {
63+
throw new IllegalArgumentException(
64+
"failed to parse [" + IndexMetadata.SETTING_AUTO_EXPAND_SEARCH_REPLICAS + "] from value: [" + value + "] at index " + dash,
65+
e
66+
);
67+
}
68+
String sMax = value.substring(dash + 1);
69+
if (sMax.equals(ALL_NODES_VALUE)) {
70+
max = Integer.MAX_VALUE;
71+
} else {
72+
try {
73+
max = Integer.parseInt(sMax);
74+
} catch (NumberFormatException e) {
75+
throw new IllegalArgumentException(
76+
"failed to parse ["
77+
+ IndexMetadata.SETTING_AUTO_EXPAND_SEARCH_REPLICAS
78+
+ "] from value: ["
79+
+ value
80+
+ "] at index "
81+
+ dash,
82+
e
83+
);
84+
}
85+
}
86+
return new AutoExpandSearchReplicas(min, max, true);
87+
}
88+
89+
private final int minSearchReplicas;
90+
private final int maxSearchReplicas;
91+
private final boolean enabled;
92+
93+
private AutoExpandSearchReplicas(int minReplicas, int maxReplicas, boolean enabled) {
94+
if (minReplicas > maxReplicas) {
95+
throw new IllegalArgumentException(
96+
"["
97+
+ IndexMetadata.SETTING_AUTO_EXPAND_SEARCH_REPLICAS
98+
+ "] minSearchReplicas must be =< maxSearchReplicas but wasn't "
99+
+ minReplicas
100+
+ " > "
101+
+ maxReplicas
102+
);
103+
}
104+
this.minSearchReplicas = minReplicas;
105+
this.maxSearchReplicas = maxReplicas;
106+
this.enabled = enabled;
107+
}
108+
109+
int getMinSearchReplicas() {
110+
return minSearchReplicas;
111+
}
112+
113+
public int getMaxSearchReplicas() {
114+
return maxSearchReplicas;
115+
}
116+
117+
public boolean isEnabled() {
118+
return enabled;
119+
}
120+
121+
private OptionalInt getDesiredNumberOfSearchReplicas(IndexMetadata indexMetadata, RoutingAllocation allocation) {
122+
int numMatchingSearchNodes = (int) allocation.nodes()
123+
.getDataNodes()
124+
.values()
125+
.stream()
126+
.filter(DiscoveryNode::isSearchNode)
127+
.map(node -> allocation.deciders().shouldAutoExpandToNode(indexMetadata, node, allocation))
128+
.filter(decision -> decision.type() != Decision.Type.NO)
129+
.count();
130+
131+
return calculateNumberOfSearchReplicas(numMatchingSearchNodes);
132+
}
133+
134+
// package private for testing
135+
OptionalInt calculateNumberOfSearchReplicas(int numMatchingSearchNodes) {
136+
// Calculate the maximum possible number of search replicas
137+
int maxPossibleReplicas = Math.min(numMatchingSearchNodes, maxSearchReplicas);
138+
139+
// Determine the number of search replicas
140+
int numberOfSearchReplicas = Math.max(minSearchReplicas, maxPossibleReplicas);
141+
142+
// Additional check to ensure we don't exceed max possible search replicas
143+
if (numberOfSearchReplicas <= maxPossibleReplicas) {
144+
return OptionalInt.of(numberOfSearchReplicas);
145+
}
146+
147+
return OptionalInt.empty();
148+
}
149+
150+
@Override
151+
public String toString() {
152+
return enabled ? minSearchReplicas + "-" + maxSearchReplicas : "false";
153+
}
154+
155+
/**
156+
* Checks if there are search replicas with the auto-expand feature that need to be adapted.
157+
* Returns a map of updates, which maps the indices to be updated to the desired number of search replicas.
158+
* The map has the desired number of search replicas as key and the indices to update as value, as this allows the result
159+
* of this method to be directly applied to RoutingTable.Builder#updateNumberOfSearchReplicas.
160+
*/
161+
public static Map<Integer, List<String>> getAutoExpandSearchReplicaChanges(Metadata metadata, RoutingAllocation allocation) {
162+
Map<Integer, List<String>> updatedSearchReplicas = new HashMap<>();
163+
164+
for (final IndexMetadata indexMetadata : metadata) {
165+
if (indexMetadata.getState() == IndexMetadata.State.OPEN || isIndexVerifiedBeforeClosed(indexMetadata)) {
166+
AutoExpandSearchReplicas autoExpandSearchReplicas = SETTING.get(indexMetadata.getSettings());
167+
if (autoExpandSearchReplicas.isEnabled()) {
168+
autoExpandSearchReplicas.getDesiredNumberOfSearchReplicas(indexMetadata, allocation)
169+
.ifPresent(numberOfSearchReplicas -> {
170+
if (numberOfSearchReplicas != indexMetadata.getNumberOfSearchOnlyReplicas()) {
171+
updatedSearchReplicas.computeIfAbsent(numberOfSearchReplicas, ArrayList::new)
172+
.add(indexMetadata.getIndex().getName());
173+
}
174+
});
175+
}
176+
}
177+
}
178+
return updatedSearchReplicas;
179+
}
180+
}

server/src/main/java/org/opensearch/cluster/metadata/IndexMetadata.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -492,7 +492,9 @@ public Iterator<Setting<?>> settings() {
492492
);
493493

494494
public static final String SETTING_AUTO_EXPAND_REPLICAS = "index.auto_expand_replicas";
495+
public static final String SETTING_AUTO_EXPAND_SEARCH_REPLICAS = "index.auto_expand_search_replicas";
495496
public static final Setting<AutoExpandReplicas> INDEX_AUTO_EXPAND_REPLICAS_SETTING = AutoExpandReplicas.SETTING;
497+
public static final Setting<AutoExpandSearchReplicas> INDEX_AUTO_EXPAND_SEARCH_REPLICAS_SETTING = AutoExpandSearchReplicas.SETTING;
496498

497499
/**
498500
* Blocks the API.

server/src/main/java/org/opensearch/cluster/metadata/MetadataCreateIndexService.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1505,7 +1505,10 @@ List<String> getIndexSettingsValidationErrors(
15051505

15061506
Optional<String> replicaValidationError = awarenessReplicaBalance.validate(replicaCount, autoExpandReplica);
15071507
replicaValidationError.ifPresent(validationErrors::add);
1508-
Optional<String> searchReplicaValidationError = awarenessReplicaBalance.validate(searchReplicaCount);
1508+
Optional<String> searchReplicaValidationError = awarenessReplicaBalance.validate(
1509+
searchReplicaCount,
1510+
AutoExpandSearchReplicas.SETTING.get(settings)
1511+
);
15091512
searchReplicaValidationError.ifPresent(validationErrors::add);
15101513
}
15111514
return validationErrors;

server/src/main/java/org/opensearch/cluster/metadata/MetadataUpdateSettingsService.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -304,7 +304,10 @@ public ClusterState execute(ClusterState currentState) {
304304
for (Index index : request.indices()) {
305305
if (index.getName().charAt(0) != '.') {
306306
// No replica count validation for system indices
307-
Optional<String> error = awarenessReplicaBalance.validate(updatedNumberOfSearchReplicas);
307+
Optional<String> error = awarenessReplicaBalance.validate(
308+
updatedNumberOfSearchReplicas,
309+
AutoExpandSearchReplicas.SETTING.get(openSettings)
310+
);
308311

309312
if (error.isPresent()) {
310313
ValidationException ex = new ValidationException();

0 commit comments

Comments
 (0)