Skip to content

Fix bug where replication lag grows post primary relocation #11238

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Dec 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Fix for stuck update action in a bulk with `retry_on_conflict` property ([#11152](https://github.com/opensearch-project/OpenSearch/issues/11152))
- Remove shadowJar from `lang-painless` module publication ([#11369](https://github.com/opensearch-project/OpenSearch/issues/11369))
- Fix remote shards balancer and remove unused variables ([#11167](https://github.com/opensearch-project/OpenSearch/pull/11167))
- Fix bug where replication lag grows post primary relocation ([#11238](https://github.com/opensearch-project/OpenSearch/pull/11238))

### Security

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,16 @@

package org.opensearch.remotestore;

import org.opensearch.action.admin.cluster.health.ClusterHealthResponse;
import org.opensearch.action.admin.cluster.stats.ClusterStatsResponse;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.routing.allocation.command.MoveAllocationCommand;
import org.opensearch.common.Priority;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.index.Index;
import org.opensearch.index.IndexService;
import org.opensearch.index.ReplicationStats;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.indices.IndicesService;
import org.opensearch.indices.replication.SegmentReplicationState;
Expand All @@ -20,10 +26,12 @@
import org.opensearch.indices.replication.common.ReplicationCollection;
import org.opensearch.test.InternalTestCluster;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.disruption.SlowClusterStateProcessing;

import java.nio.file.Path;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;

/**
* This class runs tests with remote store + segRep while blocking file downloads
Expand Down Expand Up @@ -111,6 +119,75 @@ public void testCancelReplicationWhileFetchingMetadata() throws Exception {
cleanupRepo();
}

public void testUpdateVisibleCheckpointWithLaggingClusterStateUpdates_primaryRelocation() throws Exception {
Path location = randomRepoPath().toAbsolutePath();
Settings nodeSettings = Settings.builder().put(buildRemoteStoreNodeAttributes(location, 0d, "metadata", Long.MAX_VALUE)).build();
internalCluster().startClusterManagerOnlyNode(nodeSettings);
internalCluster().startDataOnlyNodes(2, nodeSettings);
final Settings indexSettings = Settings.builder().put(indexSettings()).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1).build();
createIndex(INDEX_NAME, indexSettings);
ensureGreen(INDEX_NAME);
final Set<String> dataNodeNames = internalCluster().getDataNodeNames();
final String replicaNode = getNode(dataNodeNames, false);
final String oldPrimary = getNode(dataNodeNames, true);

// index a doc.
client().prepareIndex(INDEX_NAME).setId("1").setSource("foo", randomInt()).get();
refresh(INDEX_NAME);

logger.info("--> start another node");
final String newPrimary = internalCluster().startDataOnlyNode(nodeSettings);
ClusterHealthResponse clusterHealthResponse = client().admin()
.cluster()
.prepareHealth()
.setWaitForEvents(Priority.LANGUID)
.setWaitForNodes("4")
.get();
assertEquals(clusterHealthResponse.isTimedOut(), false);

SlowClusterStateProcessing disruption = new SlowClusterStateProcessing(replicaNode, random(), 0, 0, 1000, 2000);
internalCluster().setDisruptionScheme(disruption);
disruption.startDisrupting();

// relocate the primary
logger.info("--> relocate the shard");
client().admin()
.cluster()
.prepareReroute()
.add(new MoveAllocationCommand(INDEX_NAME, 0, oldPrimary, newPrimary))
.execute()
.actionGet();
clusterHealthResponse = client().admin()
.cluster()
.prepareHealth()
.setWaitForEvents(Priority.LANGUID)
.setWaitForNoRelocatingShards(true)
.setTimeout(new TimeValue(5, TimeUnit.MINUTES))
.execute()
.actionGet();
assertEquals(clusterHealthResponse.isTimedOut(), false);

IndexShard newPrimary_shard = getIndexShard(newPrimary, INDEX_NAME);
IndexShard replica = getIndexShard(replicaNode, INDEX_NAME);
assertBusy(() -> {
assertEquals(
newPrimary_shard.getLatestReplicationCheckpoint().getSegmentInfosVersion(),
replica.getLatestReplicationCheckpoint().getSegmentInfosVersion()
);
});

assertBusy(() -> {
ClusterStatsResponse clusterStatsResponse = client().admin().cluster().prepareClusterStats().get();
ReplicationStats replicationStats = clusterStatsResponse.getIndicesStats().getSegments().getReplicationStats();
assertEquals(0L, replicationStats.maxBytesBehind);
assertEquals(0L, replicationStats.maxReplicationLag);
assertEquals(0L, replicationStats.totalBytesBehind);
});
disruption.stopDisrupting();
disableRepoConsistencyCheck("Remote Store Creates System Repository");
cleanupRepo();
}

private String getNode(Set<String> dataNodeNames, boolean primary) {
assertEquals(2, dataNodeNames.size());
for (String name : dataNodeNames) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1322,8 +1322,10 @@ private SegmentReplicationShardStats buildShardStats(final String allocationId,
allocationId,
cps.checkpointTimers.size(),
bytesBehind,
cps.checkpointTimers.values().stream().mapToLong(SegmentReplicationLagTimer::time).max().orElse(0),
cps.checkpointTimers.values().stream().mapToLong(SegmentReplicationLagTimer::totalElapsedTime).max().orElse(0),
bytesBehind > 0L ? cps.checkpointTimers.values().stream().mapToLong(SegmentReplicationLagTimer::time).max().orElse(0) : 0,
bytesBehind > 0L
? cps.checkpointTimers.values().stream().mapToLong(SegmentReplicationLagTimer::totalElapsedTime).max().orElse(0)
: 0,
cps.lastCompletedReplicationLag
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1764,8 +1764,8 @@ public final boolean shouldProcessCheckpoint(ReplicationCheckpoint requestCheckp
if (isSegmentReplicationAllowed() == false) {
return false;
}
ReplicationCheckpoint localCheckpoint = getLatestReplicationCheckpoint();
if (localCheckpoint.isAheadOf(requestCheckpoint)) {
final ReplicationCheckpoint localCheckpoint = getLatestReplicationCheckpoint();
if (requestCheckpoint.isAheadOf(localCheckpoint) == false) {
logger.trace(
() -> new ParameterizedMessage(
"Ignoring new replication checkpoint - Shard is already on checkpoint {} that is ahead of {}",
Expand All @@ -1775,12 +1775,6 @@ public final boolean shouldProcessCheckpoint(ReplicationCheckpoint requestCheckp
);
return false;
}
if (localCheckpoint.equals(requestCheckpoint)) {
logger.trace(
() -> new ParameterizedMessage("Ignoring new replication checkpoint - Shard is already on checkpoint {}", requestCheckpoint)
);
return false;
}
return true;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,21 @@
import org.opensearch.ExceptionsHelper;
import org.opensearch.OpenSearchCorruptionException;
import org.opensearch.action.support.ChannelActionListener;
import org.opensearch.cluster.ClusterChangedEvent;
import org.opensearch.cluster.ClusterStateListener;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.Nullable;
import org.opensearch.common.lifecycle.AbstractLifecycleComponent;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.CancellableThreads;
import org.opensearch.common.util.concurrent.AbstractRunnable;
import org.opensearch.common.util.concurrent.ConcurrentCollections;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.core.transport.TransportResponse;
import org.opensearch.index.IndexService;
import org.opensearch.index.shard.IndexEventListener;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.shard.IndexShardState;
Expand Down Expand Up @@ -61,7 +65,7 @@
*
* @opensearch.internal
*/
public class SegmentReplicationTargetService implements IndexEventListener {
public class SegmentReplicationTargetService extends AbstractLifecycleComponent implements ClusterStateListener, IndexEventListener {

private static final Logger logger = LogManager.getLogger(SegmentReplicationTargetService.class);

Expand Down Expand Up @@ -144,6 +148,53 @@ public SegmentReplicationTargetService(
);
}

@Override
protected void doStart() {
if (DiscoveryNode.isDataNode(clusterService.getSettings())) {
clusterService.addListener(this);
}
}

@Override
protected void doStop() {
if (DiscoveryNode.isDataNode(clusterService.getSettings())) {
clusterService.removeListener(this);
}
}

@Override
protected void doClose() throws IOException {

}
Comment on lines +166 to +168
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should doClose replicate doStop?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesn't need to as stop is invoked before close when the node is shut down from Node#close. However, I don't see this getting explicitly closed there along with multiple multiple other services extending AbstractLifecycleComponent, ex PeerRecoverySourceService. Unless I'm missing something here, will raise a pr to get these added.


@Override
public void clusterChanged(ClusterChangedEvent event) {
if (event.routingTableChanged()) {
for (IndexService indexService : indicesService) {
if (indexService.getIndexSettings().isSegRepEnabled() && event.indexRoutingTableChanged(indexService.index().getName())) {
for (IndexShard shard : indexService) {
if (shard.routingEntry().primary() == false) {
// for this shard look up its primary routing, if it has completed a relocation trigger replication
final String previousNode = event.previousState()
.routingTable()
.shardRoutingTable(shard.shardId())
.primaryShard()
.currentNodeId();
final String currentNode = event.state()
.routingTable()
.shardRoutingTable(shard.shardId())
.primaryShard()
.currentNodeId();
if (previousNode.equals(currentNode) == false) {
processLatestReceivedCheckpoint(shard, Thread.currentThread());
}
}
}
}
}
}
}

/**
* Cancel any replications on this node for a replica that is about to be closed.
*/
Expand Down Expand Up @@ -395,7 +446,7 @@ private DiscoveryNode getPrimaryNode(ShardRouting primaryShard) {
// visible to tests
protected boolean processLatestReceivedCheckpoint(IndexShard replicaShard, Thread thread) {
final ReplicationCheckpoint latestPublishedCheckpoint = latestReceivedCheckpoint.get(replicaShard.shardId());
if (latestPublishedCheckpoint != null && latestPublishedCheckpoint.isAheadOf(replicaShard.getLatestReplicationCheckpoint())) {
if (latestPublishedCheckpoint != null) {
logger.trace(
() -> new ParameterizedMessage(
"Processing latest received checkpoint for shard {} {}",
Expand Down
2 changes: 2 additions & 0 deletions server/src/main/java/org/opensearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -1414,6 +1414,7 @@ public Node start() throws NodeValidationException {
assert transportService.getLocalNode().equals(localNodeFactory.getNode())
: "transportService has a different local node than the factory provided";
injector.getInstance(PeerRecoverySourceService.class).start();
injector.getInstance(SegmentReplicationTargetService.class).start();
injector.getInstance(SegmentReplicationSourceService.class).start();

final RemoteClusterStateService remoteClusterStateService = injector.getInstance(RemoteClusterStateService.class);
Expand Down Expand Up @@ -1602,6 +1603,7 @@ public synchronized void close() throws IOException {
toClose.add(injector.getInstance(IndicesStore.class));
toClose.add(injector.getInstance(PeerRecoverySourceService.class));
toClose.add(injector.getInstance(SegmentReplicationSourceService.class));
toClose.add(injector.getInstance(SegmentReplicationTargetService.class));
toClose.add(() -> stopWatch.stop().start("cluster"));
toClose.add(injector.getInstance(ClusterService.class));
toClose.add(() -> stopWatch.stop().start("node_connections_service"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,26 @@
import org.opensearch.ExceptionsHelper;
import org.opensearch.OpenSearchException;
import org.opensearch.Version;
import org.opensearch.cluster.ClusterChangedEvent;
import org.opensearch.cluster.ClusterName;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.cluster.routing.IndexRoutingTable;
import org.opensearch.cluster.routing.RecoverySource;
import org.opensearch.cluster.routing.RoutingTable;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.routing.ShardRoutingState;
import org.opensearch.cluster.routing.UnassignedInfo;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.CancellableThreads;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.core.transport.TransportResponse;
import org.opensearch.index.IndexService;
import org.opensearch.index.engine.NRTReplicationEngineFactory;
import org.opensearch.index.replication.TestReplicationSource;
import org.opensearch.index.shard.IndexShard;
Expand Down Expand Up @@ -51,6 +59,7 @@
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
Expand Down Expand Up @@ -91,6 +100,8 @@ public class SegmentReplicationTargetServiceTests extends IndexShardTestCase {
private SegmentReplicationState state;
private ReplicationCheckpoint initialCheckpoint;

private ClusterState clusterState;

private static final long TRANSPORT_TIMEOUT = 30000;// 30sec

@Override
Expand Down Expand Up @@ -129,7 +140,7 @@ public void setUp() throws Exception {

indicesService = mock(IndicesService.class);
ClusterService clusterService = mock(ClusterService.class);
ClusterState clusterState = mock(ClusterState.class);
clusterState = mock(ClusterState.class);
RoutingTable mockRoutingTable = mock(RoutingTable.class);
when(clusterService.state()).thenReturn(clusterState);
when(clusterState.routingTable()).thenReturn(mockRoutingTable);
Expand Down Expand Up @@ -465,9 +476,22 @@ public void testStartReplicationListenerFailure() throws InterruptedException {
verify(spy, (never())).updateVisibleCheckpoint(eq(0L), eq(replicaShard));
}

public void testDoNotProcessLatestCheckpointIfItIsbehind() {
sut.updateLatestReceivedCheckpoint(replicaShard.getLatestReplicationCheckpoint(), replicaShard);
assertFalse(sut.processLatestReceivedCheckpoint(replicaShard, null));
public void testDoNotProcessLatestCheckpointIfCheckpointIsBehind() {
SegmentReplicationTargetService service = spy(sut);
doReturn(mock(SegmentReplicationTarget.class)).when(service).startReplication(any(), any(), any());
ReplicationCheckpoint checkpoint = replicaShard.getLatestReplicationCheckpoint();
service.updateLatestReceivedCheckpoint(checkpoint, replicaShard);
service.processLatestReceivedCheckpoint(replicaShard, null);
verify(service, times(0)).startReplication(eq(replicaShard), eq(checkpoint), any());
}

public void testProcessLatestCheckpointIfCheckpointAhead() {
SegmentReplicationTargetService service = spy(sut);
doNothing().when(service).startReplication(any());
doReturn(mock(SegmentReplicationTarget.class)).when(service).startReplication(any(), any(), any());
service.updateLatestReceivedCheckpoint(aheadCheckpoint, replicaShard);
service.processLatestReceivedCheckpoint(replicaShard, null);
verify(service, times(1)).startReplication(eq(replicaShard), eq(aheadCheckpoint), any());
}

public void testOnNewCheckpointInvokedOnClosedShardDoesNothing() throws IOException {
Expand Down Expand Up @@ -617,4 +641,46 @@ public void onReplicationFailure(SegmentReplicationState state, ReplicationFaile
target.cancel("test");
sut.startReplication(target);
}

public void testProcessCheckpointOnClusterStateUpdate() {
// set up mocks on indicies & index service to return our replica's index & shard.
IndexService indexService = mock(IndexService.class);
when(indexService.iterator()).thenReturn(Set.of(replicaShard).iterator());
when(indexService.getIndexSettings()).thenReturn(replicaShard.indexSettings());
when(indexService.index()).thenReturn(replicaShard.routingEntry().index());
when(indicesService.iterator()).thenReturn(Set.of(indexService).iterator());

// create old & new cluster states
final String targetNodeId = "targetNodeId";
ShardRouting initialRouting = primaryShard.routingEntry().relocate(targetNodeId, 0L);
assertEquals(ShardRoutingState.RELOCATING, initialRouting.state());

ShardRouting targetRouting = ShardRouting.newUnassigned(
primaryShard.shardId(),
true,
RecoverySource.PeerRecoverySource.INSTANCE,
new UnassignedInfo(UnassignedInfo.Reason.REINITIALIZED, "test")
).initialize(targetNodeId, initialRouting.allocationId().getId(), 0L).moveToStarted();
assertEquals(targetNodeId, targetRouting.currentNodeId());
assertEquals(ShardRoutingState.STARTED, targetRouting.state());
ClusterState oldState = ClusterState.builder(ClusterName.DEFAULT)
.routingTable(
RoutingTable.builder()
.add(IndexRoutingTable.builder(primaryShard.shardId().getIndex()).addShard(initialRouting).build())
.build()
)
.build();
ClusterState newState = ClusterState.builder(ClusterName.DEFAULT)
.routingTable(
RoutingTable.builder()
.add(IndexRoutingTable.builder(primaryShard.shardId().getIndex()).addShard(targetRouting).build())
.build()
)
.build();

// spy so we can verify process is invoked
SegmentReplicationTargetService spy = spy(sut);
spy.clusterChanged(new ClusterChangedEvent("ignored", oldState, newState));
verify(spy, times(1)).processLatestReceivedCheckpoint(eq(replicaShard), any());
}
}