Skip to content

Search Replica Allocation and Recovery #17457

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 21 commits into from
Mar 21, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
1dc226b
Restrict Search Replicas to Allocate only to Search dedicated node
vinaykpud Feb 25, 2025
9e84ef1
fixed the javadoc
vinaykpud Feb 25, 2025
945c7d8
fixed tests
vinaykpud Feb 26, 2025
fcbbd1b
Treat Regular and Search Replicas Separately to Prevent Allocation Bl…
vinaykpud Feb 28, 2025
7f8cfab
Updated tests and some refactor
vinaykpud Mar 1, 2025
1406dc1
Fixed SearchReplica recovery scenario for same node and new node
vinaykpud Mar 1, 2025
7e205ae
Updated the logic for SearchReplica recovery scenario for new node
vinaykpud Mar 3, 2025
4616b16
Fixed nits after self review
vinaykpud Mar 6, 2025
49c88de
Modified the search replica allocation based on node attribute
vinaykpud Mar 7, 2025
219e9a4
fixed PR comments
vinaykpud Mar 18, 2025
e7f2fe1
Revert "Fixed SearchReplica recovery scenario for same node and new n…
vinaykpud Mar 18, 2025
c4adc21
Separated the recovery flow method for search replica
vinaykpud Mar 18, 2025
b8ac81f
Revert "fixed PR comments"
vinaykpud Mar 18, 2025
a5f45c4
Added unit tests in IndexShardTests
vinaykpud Mar 18, 2025
cf217c0
updated method name and minor refactor
vinaykpud Mar 18, 2025
e97d8f7
Removed search replica recovery logic from internalRecoverFromStore m…
vinaykpud Mar 18, 2025
428cbf3
Added integ test to cover search node restart scenario
vinaykpud Mar 18, 2025
1ecdbc3
Applied search node role in tests and removed searchonly attribute
vinaykpud Mar 18, 2025
501ba77
Fixed failing test
vinaykpud Mar 19, 2025
d197bd0
Removed unwanted comment
vinaykpud Mar 19, 2025
79a8da2
Address PR comments
vinaykpud Mar 19, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,31 +21,21 @@
import java.util.stream.Collectors;

import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REPLICATION_TYPE;
import static org.opensearch.cluster.routing.allocation.decider.SearchReplicaAllocationDecider.SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_SETTING;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class SearchReplicaFilteringAllocationIT extends RemoteStoreBaseIntegTestCase {
public class SearchReplicaAllocationIT extends RemoteStoreBaseIntegTestCase {

@Override
protected Settings featureFlagSettings() {
return Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.READER_WRITER_SPLIT_EXPERIMENTAL, Boolean.TRUE).build();
}

public void testSearchReplicaDedicatedIncludes() {
List<String> nodesIds = internalCluster().startNodes(3);
final String node_0 = nodesIds.get(0);
final String node_1 = nodesIds.get(1);
final String node_2 = nodesIds.get(2);
assertEquals(3, cluster().size());
public void testSearchReplicaAllocatedToDedicatedSearchNode() {
internalCluster().startClusterManagerOnlyNode();
String primaryNode = internalCluster().startDataOnlyNode();
internalCluster().startSearchOnlyNode();

client().admin()
.cluster()
.prepareUpdateSettings()
.setTransientSettings(
Settings.builder().put(SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_SETTING.getKey() + "_name", node_1 + "," + node_0)
)
.execute()
.actionGet();
assertEquals(3, cluster().size());

createIndex(
"test",
Expand All @@ -57,42 +47,16 @@ public void testSearchReplicaDedicatedIncludes() {
.build()
);
ensureGreen("test");
// ensure primary is not on node 0 or 1,
// ensure primary is not on searchNode
IndexShardRoutingTable routingTable = getRoutingTable();
assertEquals(node_2, getNodeName(routingTable.primaryShard().currentNodeId()));

String existingSearchReplicaNode = getNodeName(routingTable.searchOnlyReplicas().get(0).currentNodeId());
String emptyAllowedNode = existingSearchReplicaNode.equals(node_0) ? node_1 : node_0;

// set the included nodes to the other open node, search replica should relocate to that node.
client().admin()
.cluster()
.prepareUpdateSettings()
.setTransientSettings(Settings.builder().put(SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_SETTING.getKey() + "_name", emptyAllowedNode))
.execute()
.actionGet();
ensureGreen("test");

routingTable = getRoutingTable();
assertEquals(node_2, getNodeName(routingTable.primaryShard().currentNodeId()));
assertEquals(emptyAllowedNode, getNodeName(routingTable.searchOnlyReplicas().get(0).currentNodeId()));
assertEquals(primaryNode, getNodeName(routingTable.primaryShard().currentNodeId()));
}

public void testSearchReplicaDedicatedIncludes_DoNotAssignToOtherNodes() {
List<String> nodesIds = internalCluster().startNodes(3);
final String node_0 = nodesIds.get(0);
final String node_1 = nodesIds.get(1);
final String node_2 = nodesIds.get(2);
internalCluster().startNodes(2);
final String node_1 = internalCluster().startSearchOnlyNode();
assertEquals(3, cluster().size());

// set filter on 1 node and set search replica count to 2 - should leave 1 unassigned
client().admin()
.cluster()
.prepareUpdateSettings()
.setTransientSettings(Settings.builder().put(SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_SETTING.getKey() + "_name", node_1))
.execute()
.actionGet();

logger.info("--> creating an index with no replicas");
createIndex(
"test",
Expand All @@ -115,9 +79,32 @@ public void testSearchReplicaDedicatedIncludes_DoNotAssignToOtherNodes() {
assertEquals(1, routingTable.searchOnlyReplicas().stream().filter(ShardRouting::unassigned).count());
}

public void testSearchReplicaDedicatedIncludes_WhenNotSetDoNotAssign() {
internalCluster().startNodes(2);
assertEquals(2, cluster().size());

createIndex(
"test",
Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.put(IndexMetadata.SETTING_NUMBER_OF_SEARCH_REPLICAS, 1)
.put(SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
.build()
);
ensureYellowAndNoInitializingShards("test");
IndexShardRoutingTable routingTable = getRoutingTable();
assertNull(routingTable.searchOnlyReplicas().get(0).currentNodeId());

// Add a search node
final String searchNode = internalCluster().startSearchOnlyNode();

ensureGreen("test");
assertEquals(searchNode, getNodeName(getRoutingTable().searchOnlyReplicas().get(0).currentNodeId()));
}

private IndexShardRoutingTable getRoutingTable() {
IndexShardRoutingTable routingTable = getClusterState().routingTable().index("test").getShards().get(0);
return routingTable;
return getClusterState().routingTable().index("test").getShards().get(0);
}

private String getNodeName(String id) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,13 @@
import org.junit.After;

import java.nio.file.Path;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutionException;

import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SEARCH_REPLICAS;
import static org.opensearch.cluster.routing.RecoverySource.Type.EMPTY_STORE;
import static org.opensearch.cluster.routing.RecoverySource.Type.EXISTING_STORE;
import static org.opensearch.cluster.routing.allocation.decider.SearchReplicaAllocationDecider.SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_SETTING;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class SearchReplicaReplicationAndRecoveryIT extends SegmentReplicationBaseIT {
Expand Down Expand Up @@ -84,20 +82,22 @@ public void testReplication() throws Exception {
final String primary = internalCluster().startDataOnlyNode();
createIndex(INDEX_NAME);
ensureYellowAndNoInitializingShards(INDEX_NAME);
final String replica = internalCluster().startDataOnlyNode();
final String searchNode = internalCluster().startSearchOnlyNode();

ensureGreen(INDEX_NAME);

final int docCount = 10;
for (int i = 0; i < docCount; i++) {
client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute().get();
}
refresh(INDEX_NAME);
waitForSearchableDocs(docCount, primary, replica);
waitForSearchableDocs(docCount, primary, searchNode);
}

public void testSegmentReplicationStatsResponseWithSearchReplica() throws Exception {
internalCluster().startClusterManagerOnlyNode();
final List<String> nodes = internalCluster().startDataOnlyNodes(2);
final String searchNode = internalCluster().startSearchOnlyNode();
final String primary = internalCluster().startDataOnlyNode();
createIndex(
INDEX_NAME,
Settings.builder()
Expand All @@ -107,14 +107,15 @@ public void testSegmentReplicationStatsResponseWithSearchReplica() throws Except
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
.build()
);

ensureGreen(INDEX_NAME);

final int docCount = 5;
for (int i = 0; i < docCount; i++) {
client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute().get();
}
refresh(INDEX_NAME);
waitForSearchableDocs(docCount, nodes);
waitForSearchableDocs(docCount, primary, searchNode);

SegmentReplicationStatsResponse segmentReplicationStatsResponse = dataNodeClient().admin()
.indices()
Expand Down Expand Up @@ -142,46 +143,35 @@ public void testSegmentReplicationStatsResponseWithSearchReplica() throws Except
public void testSearchReplicaRecovery() throws Exception {
internalCluster().startClusterManagerOnlyNode();
final String primary = internalCluster().startDataOnlyNode();
final String replica = internalCluster().startDataOnlyNode();

// ensure search replicas are only allocated to "replica" node.
client().admin()
.cluster()
.prepareUpdateSettings()
.setTransientSettings(Settings.builder().put(SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_SETTING.getKey() + "_name", replica))
.execute()
.actionGet();
final String searchNode = internalCluster().startSearchOnlyNode();

createIndex(INDEX_NAME);
ensureGreen(INDEX_NAME);
assertRecoverySourceType(replica, EMPTY_STORE);
assertRecoverySourceType(searchNode, EMPTY_STORE);

final int docCount = 10;
for (int i = 0; i < docCount; i++) {
client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute().get();
}
refresh(INDEX_NAME);
flush(INDEX_NAME);
waitForSearchableDocs(10, primary, replica);
waitForSearchableDocs(10, primary, searchNode);

// Node stats should show remote download stats as nonzero, use this as a precondition to compare
// post restart.
assertDownloadStats(replica, true);
NodesStatsResponse nodesStatsResponse;
NodeStats nodeStats;
assertDownloadStats(searchNode, true);

internalCluster().restartNode(replica);
internalCluster().restartNode(searchNode);
ensureGreen(INDEX_NAME);
assertDocCounts(10, replica);
assertDocCounts(10, searchNode);

// assert existing store recovery
assertRecoverySourceType(replica, EXISTING_STORE);
assertDownloadStats(replica, false);
assertRecoverySourceType(searchNode, EXISTING_STORE);
assertDownloadStats(searchNode, false);
}

public void testRecoveryAfterDocsIndexed() throws Exception {
internalCluster().startClusterManagerOnlyNode();
final String primary = internalCluster().startDataOnlyNode();
internalCluster().startDataOnlyNode();
createIndex(INDEX_NAME);
ensureYellowAndNoInitializingShards(INDEX_NAME);
final int docCount = 10;
Expand All @@ -190,13 +180,14 @@ public void testRecoveryAfterDocsIndexed() throws Exception {
}
refresh(INDEX_NAME);

final String replica = internalCluster().startDataOnlyNode();
final String searchNode = internalCluster().startSearchOnlyNode();

ensureGreen(INDEX_NAME);
assertDocCounts(10, replica);
assertDocCounts(10, searchNode);

assertRecoverySourceType(replica, EMPTY_STORE);
assertRecoverySourceType(searchNode, EMPTY_STORE);
// replica should have downloaded from remote
assertDownloadStats(replica, true);
assertDownloadStats(searchNode, true);

client().admin()
.indices()
Expand All @@ -212,14 +203,14 @@ public void testRecoveryAfterDocsIndexed() throws Exception {
.setSettings(Settings.builder().put(SETTING_NUMBER_OF_SEARCH_REPLICAS, 1))
.get();
ensureGreen(INDEX_NAME);
assertDocCounts(10, replica);
assertDocCounts(10, searchNode);

internalCluster().restartNode(replica);
internalCluster().restartNode(searchNode);

ensureGreen(INDEX_NAME);
assertDocCounts(10, replica);
assertRecoverySourceType(replica, EXISTING_STORE);
assertDownloadStats(replica, false);
assertDocCounts(10, searchNode);
assertRecoverySourceType(searchNode, EXISTING_STORE);
assertDownloadStats(searchNode, false);
}

private static void assertRecoverySourceType(String replica, RecoverySource.Type recoveryType) throws InterruptedException,
Expand Down Expand Up @@ -257,29 +248,30 @@ public void testStopPrimary_RestoreOnNewNode() throws Exception {
refresh(INDEX_NAME);
assertDocCounts(docCount, primary);

final String replica = internalCluster().startDataOnlyNode();
final String searchNode = internalCluster().startSearchOnlyNode();

ensureGreen(INDEX_NAME);
assertDocCounts(docCount, replica);
assertDocCounts(docCount, searchNode);
// stop the primary
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(primary));

assertBusy(() -> {
ClusterHealthResponse clusterHealthResponse = clusterAdmin().prepareHealth(INDEX_NAME).get();
assertEquals(ClusterHealthStatus.RED, clusterHealthResponse.getStatus());
});
assertDocCounts(docCount, replica);
assertDocCounts(docCount, searchNode);

String restoredPrimary = internalCluster().startDataOnlyNode();

client().admin().cluster().restoreRemoteStore(new RestoreRemoteStoreRequest().indices(INDEX_NAME), PlainActionFuture.newFuture());
ensureGreen(INDEX_NAME);
assertDocCounts(docCount, replica, restoredPrimary);
assertDocCounts(docCount, searchNode, restoredPrimary);

for (int i = docCount; i < docCount * 2; i++) {
client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute().get();
}
refresh(INDEX_NAME);
assertBusy(() -> assertDocCounts(20, replica, restoredPrimary));
assertBusy(() -> assertDocCounts(20, searchNode, restoredPrimary));
}

public void testFailoverToNewPrimaryWithPollingReplication() throws Exception {
Expand All @@ -293,9 +285,10 @@ public void testFailoverToNewPrimaryWithPollingReplication() throws Exception {
}
refresh(INDEX_NAME);

final String replica = internalCluster().startDataOnlyNode();
final String searchNode = internalCluster().startSearchOnlyNode();

ensureGreen(INDEX_NAME);
assertDocCounts(10, replica);
assertDocCounts(10, searchNode);

client().admin()
.indices()
Expand All @@ -314,12 +307,12 @@ public void testFailoverToNewPrimaryWithPollingReplication() throws Exception {
});
ClusterHealthResponse clusterHealthResponse = clusterAdmin().prepareHealth(INDEX_NAME).get();
assertEquals(ClusterHealthStatus.YELLOW, clusterHealthResponse.getStatus());
assertDocCounts(10, replica);
assertDocCounts(10, searchNode);

for (int i = docCount; i < docCount * 2; i++) {
client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute().get();
}
refresh(INDEX_NAME);
assertBusy(() -> assertDocCounts(20, replica, writer_replica));
assertBusy(() -> assertDocCounts(20, searchNode, writer_replica));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,15 +72,17 @@ public void testSearchReplicaRestore_WhenSnapshotOnSegRep_RestoreOnSegRepWithSea
Settings.builder().put(SETTING_NUMBER_OF_SEARCH_REPLICAS, 1).build()
);
ensureYellowAndNoInitializingShards(RESTORED_INDEX_NAME);
internalCluster().startDataOnlyNode();

internalCluster().startSearchOnlyNode();

ensureGreen(RESTORED_INDEX_NAME);
assertEquals(1, getNumberOfSearchReplicas(RESTORED_INDEX_NAME));

SearchResponse resp = client().prepareSearch(RESTORED_INDEX_NAME).setQuery(QueryBuilders.matchAllQuery()).get();
assertHitCount(resp, DOC_COUNT);
}

public void testSearchReplicaRestore_WhenSnapshotOnSegRepWithSearchReplica_RestoreOnDocRep() throws Exception {
public void testSearchReplicaRestore_WhenSnapshotOnSegRepWithSearchReplica_RestoreOnDocRep() {
bootstrapIndexWithSearchReplicas();
createRepoAndSnapshot(REPOSITORY_NAME, FS_REPOSITORY_TYPE, SNAPSHOT_NAME, INDEX_NAME);

Expand All @@ -98,7 +100,7 @@ public void testSearchReplicaRestore_WhenSnapshotOnSegRepWithSearchReplica_Resto
}

private void bootstrapIndexWithOutSearchReplicas(ReplicationType replicationType) throws InterruptedException {
startCluster(2);
internalCluster().startNodes(2);

Settings settings = Settings.builder()
.put(super.indexSettings())
Expand All @@ -114,8 +116,9 @@ private void bootstrapIndexWithOutSearchReplicas(ReplicationType replicationType
ensureGreen(INDEX_NAME);
}

private void bootstrapIndexWithSearchReplicas() throws InterruptedException {
startCluster(3);
private void bootstrapIndexWithSearchReplicas() {
internalCluster().startNodes(2);
internalCluster().startSearchOnlyNode();

Settings settings = Settings.builder()
.put(super.indexSettings())
Expand All @@ -126,18 +129,14 @@ private void bootstrapIndexWithSearchReplicas() throws InterruptedException {
.build();

createIndex(INDEX_NAME, settings);

ensureGreen(INDEX_NAME);
for (int i = 0; i < DOC_COUNT; i++) {
client().prepareIndex(INDEX_NAME).setId(String.valueOf(i)).setSource("foo", "bar").get();
}
flushAndRefresh(INDEX_NAME);
}

private void startCluster(int numOfNodes) {
internalCluster().startClusterManagerOnlyNode();
internalCluster().startDataOnlyNodes(numOfNodes);
}

private void createRepoAndSnapshot(String repositoryName, String repositoryType, String snapshotName, String indexName) {
createRepository(repositoryName, repositoryType, randomRepoPath().toAbsolutePath());
createSnapshot(repositoryName, snapshotName, List.of(indexName));
Expand Down
Loading
Loading