Skip to content

Commit c1eedb9

Browse files
committed
Segment Replication - Commit SegmentInfos on replicas when new generation received.
This change updates NRTReplicationEngine to trigger a commit when a new segment generation is received from the primary. It also updates the engine to commit when a replica is closed so that it can restart from the same segments. Signed-off-by: Marc Handalian <[email protected]>
1 parent e7f9de5 commit c1eedb9

File tree

6 files changed

+279
-124
lines changed

6 files changed

+279
-124
lines changed

server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java

Lines changed: 22 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,8 @@ public class NRTReplicationEngine extends Engine {
5454
private final LocalCheckpointTracker localCheckpointTracker;
5555
private final WriteOnlyTranslogManager translogManager;
5656

57+
private volatile long lastReceivedGen = SequenceNumbers.NO_OPS_PERFORMED;
58+
5759
private static final int SI_COUNTER_INCREMENT = 10;
5860

5961
public NRTReplicationEngine(EngineConfig engineConfig) {
@@ -120,14 +122,16 @@ public TranslogManager translogManager() {
120122

121123
public synchronized void updateSegments(final SegmentInfos infos, long seqNo) throws IOException {
122124
// Update the current infos reference on the Engine's reader.
125+
long incomingGeneration = infos.getGeneration();
123126
readerManager.updateSegments(infos);
124127

125128
// only update the persistedSeqNo and "lastCommitted" infos reference if the incoming segments have a higher
126129
// generation. We can still refresh with incoming SegmentInfos that are not part of a commit point.
127-
if (infos.getGeneration() > lastCommittedSegmentInfos.getGeneration()) {
128-
this.lastCommittedSegmentInfos = infos;
130+
if (incomingGeneration != lastReceivedGen) {
131+
commitSegmentInfos();
129132
translogManager.rollTranslogGeneration();
130133
}
134+
lastReceivedGen = incomingGeneration;
131135
localCheckpointTracker.fastForwardProcessedSeqNo(seqNo);
132136
}
133137

@@ -141,20 +145,16 @@ public synchronized void updateSegments(final SegmentInfos infos, long seqNo) th
141145
*
142146
* @throws IOException - When there is an IO error committing the SegmentInfos.
143147
*/
144-
public void commitSegmentInfos() throws IOException {
145-
// TODO: This method should wait for replication events to finalize.
146-
final SegmentInfos latestSegmentInfos = getLatestSegmentInfos();
147-
/*
148-
This is a workaround solution which decreases the chances of conflict on replica nodes when same file is copied
149-
from two different primaries during failover. Increasing counter helps in avoiding this conflict as counter is
150-
used to generate new segment file names. The ideal solution is to identify the counter from previous primary.
151-
*/
152-
latestSegmentInfos.counter = latestSegmentInfos.counter + SI_COUNTER_INCREMENT;
153-
latestSegmentInfos.changed();
154-
store.commitSegmentInfos(latestSegmentInfos, localCheckpointTracker.getMaxSeqNo(), localCheckpointTracker.getProcessedCheckpoint());
148+
private void commitSegmentInfos(SegmentInfos infos) throws IOException {
149+
store.commitSegmentInfos(infos, localCheckpointTracker.getMaxSeqNo(), localCheckpointTracker.getProcessedCheckpoint());
150+
this.lastCommittedSegmentInfos = store.readLastCommittedSegmentsInfo();
155151
translogManager.syncTranslog();
156152
}
157153

154+
protected void commitSegmentInfos() throws IOException {
155+
commitSegmentInfos(getLatestSegmentInfos());
156+
}
157+
158158
@Override
159159
public String getHistoryUUID() {
160160
return loadHistoryUUID(lastCommittedSegmentInfos.userData);
@@ -354,6 +354,15 @@ protected final void closeNoLock(String reason, CountDownLatch closedLatch) {
354354
assert rwl.isWriteLockedByCurrentThread() || failEngineLock.isHeldByCurrentThread()
355355
: "Either the write lock must be held or the engine must be currently be failing itself";
356356
try {
357+
final SegmentInfos latestSegmentInfos = getLatestSegmentInfos();
358+
/*
359+
This is a workaround solution which decreases the chances of conflict on replica nodes when same file is copied
360+
from two different primaries during failover. Increasing counter helps in avoiding this conflict as counter is
361+
used to generate new segment file names. The ideal solution is to identify the counter from previous primary.
362+
*/
363+
latestSegmentInfos.counter = latestSegmentInfos.counter + SI_COUNTER_INCREMENT;
364+
latestSegmentInfos.changed();
365+
commitSegmentInfos(latestSegmentInfos);
357366
IOUtils.close(readerManager, translogManager, store::decRef);
358367
} catch (Exception e) {
359368
logger.warn("failed to close engine", e);

server/src/main/java/org/opensearch/index/engine/NRTReplicationReaderManager.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@ protected OpenSearchDirectoryReader refreshIfNeeded(OpenSearchDirectoryReader re
7474
* @throws IOException - When Refresh fails with an IOException.
7575
*/
7676
public synchronized void updateSegments(SegmentInfos infos) throws IOException {
77+
infos.updateGeneration(currentInfos);
7778
currentInfos = infos;
7879
maybeRefresh();
7980
}

server/src/main/java/org/opensearch/index/shard/IndexShard.java

Lines changed: 2 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -623,7 +623,7 @@ public void updateShardState(
623623
if (indexSettings.isSegRepEnabled()) {
624624
// this Shard's engine was read only, we need to update its engine before restoring local history from xlog.
625625
assert newRouting.primary() && currentRouting.primary() == false;
626-
promoteNRTReplicaToPrimary();
626+
resetEngineToGlobalCheckpoint();
627627
}
628628
replicationTracker.activatePrimaryMode(getLocalCheckpoint());
629629
ensurePeerRecoveryRetentionLeasesExist();
@@ -3557,7 +3557,7 @@ private void innerAcquireReplicaOperationPermit(
35573557
currentGlobalCheckpoint,
35583558
maxSeqNo
35593559
);
3560-
if (currentGlobalCheckpoint < maxSeqNo) {
3560+
if (currentGlobalCheckpoint < maxSeqNo && indexSettings.isSegRepEnabled() == false) {
35613561
resetEngineToGlobalCheckpoint();
35623562
} else {
35633563
getEngine().translogManager().rollTranslogGeneration();
@@ -4120,26 +4120,4 @@ RetentionLeaseSyncer getRetentionLeaseSyncer() {
41204120
public GatedCloseable<SegmentInfos> getSegmentInfosSnapshot() {
41214121
return getEngine().getSegmentInfosSnapshot();
41224122
}
4123-
4124-
/**
4125-
* With segment replication enabled - prepare the shard's engine to be promoted as the new primary.
4126-
*
4127-
* If this shard is currently using a replication engine, this method:
4128-
* 1. Invokes {@link NRTReplicationEngine#commitSegmentInfos()} to ensure the engine can be reopened as writeable from the latest refresh point.
4129-
* InternalEngine opens its IndexWriter from an on-disk commit point, but this replica may have recently synced from a primary's refresh point, meaning it has documents searchable in its in-memory SegmentInfos
4130-
* that are not part of a commit point. This ensures that those documents are made part of a commit and do not need to be reindexed after promotion.
4131-
* 2. Invokes resetEngineToGlobalCheckpoint - This call performs the engine swap, opening up as a writeable engine and replays any operations in the xlog. The operations indexed from xlog here will be
4132-
* any ack'd writes that were not copied to this replica before promotion.
4133-
*/
4134-
private void promoteNRTReplicaToPrimary() {
4135-
assert shardRouting.primary() && indexSettings.isSegRepEnabled();
4136-
getReplicationEngine().ifPresentOrElse(engine -> {
4137-
try {
4138-
engine.commitSegmentInfos();
4139-
resetEngineToGlobalCheckpoint();
4140-
} catch (IOException e) {
4141-
throw new EngineException(shardId, "Unable to update replica to writeable engine, failing shard", e);
4142-
}
4143-
}, () -> { throw new EngineException(shardId, "Expected replica engine to be of type NRTReplicationEngine"); });
4144-
}
41454123
}

server/src/test/java/org/opensearch/index/engine/NRTReplicationEngineTests.java

Lines changed: 54 additions & 80 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,8 @@
1515
import org.opensearch.cluster.metadata.IndexMetadata;
1616
import org.opensearch.common.concurrent.GatedCloseable;
1717
import org.opensearch.common.lucene.Lucene;
18-
import org.opensearch.common.lucene.search.Queries;
1918
import org.opensearch.common.settings.Settings;
2019
import org.opensearch.index.IndexSettings;
21-
import org.opensearch.index.mapper.ParsedDocument;
2220
import org.opensearch.index.seqno.LocalCheckpointTracker;
2321
import org.opensearch.index.seqno.SequenceNumbers;
2422
import org.opensearch.index.store.Store;
@@ -36,17 +34,21 @@
3634
import java.util.stream.Collectors;
3735

3836
import static org.hamcrest.Matchers.equalTo;
39-
import static org.hamcrest.Matchers.notNullValue;
4037
import static org.opensearch.index.seqno.SequenceNumbers.NO_OPS_PERFORMED;
4138
import static org.opensearch.index.seqno.SequenceNumbers.LOCAL_CHECKPOINT_KEY;
4239
import static org.opensearch.index.seqno.SequenceNumbers.MAX_SEQ_NO;
4340

4441
public class NRTReplicationEngineTests extends EngineTestCase {
4542

43+
private static final IndexSettings INDEX_SETTINGS = IndexSettingsModule.newIndexSettings(
44+
"index",
45+
Settings.builder().put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT).build()
46+
);
47+
4648
public void testCreateEngine() throws IOException {
4749
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
4850
try (
49-
final Store nrtEngineStore = createStore();
51+
final Store nrtEngineStore = createStore(INDEX_SETTINGS, newDirectory());
5052
final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine(globalCheckpoint, nrtEngineStore)
5153
) {
5254
final SegmentInfos latestSegmentInfos = nrtEngine.getLatestSegmentInfos();
@@ -70,7 +72,7 @@ public void testEngineWritesOpsToTranslog() throws Exception {
7072
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
7173

7274
try (
73-
final Store nrtEngineStore = createStore();
75+
final Store nrtEngineStore = createStore(INDEX_SETTINGS, newDirectory());
7476
final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine(globalCheckpoint, nrtEngineStore)
7577
) {
7678
List<Engine.Operation> operations = generateHistoryOnReplica(
@@ -104,88 +106,63 @@ public void testEngineWritesOpsToTranslog() throws Exception {
104106
}
105107
}
106108

107-
public void testUpdateSegments() throws Exception {
109+
public void testUpdateSegments_CommitOnGenIncrease() throws IOException {
108110
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
109111

110112
try (
111-
final Store nrtEngineStore = createStore();
113+
final Store nrtEngineStore = createStore(INDEX_SETTINGS, newDirectory());
112114
final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine(globalCheckpoint, nrtEngineStore)
113115
) {
114-
// add docs to the primary engine.
115-
List<Engine.Operation> operations = generateHistoryOnReplica(
116-
between(1, 500),
117-
randomBoolean(),
118-
randomBoolean(),
119-
randomBoolean(),
120-
Engine.Operation.TYPE.INDEX
121-
);
122-
123-
for (Engine.Operation op : operations) {
124-
applyOperation(engine, op);
125-
applyOperation(nrtEngine, op);
126-
}
127-
128-
engine.refresh("test");
129-
130-
final SegmentInfos latestPrimaryInfos = engine.getLatestSegmentInfos();
131-
nrtEngine.updateSegments(latestPrimaryInfos, engine.getProcessedLocalCheckpoint());
132-
assertMatchingSegmentsAndCheckpoints(nrtEngine, latestPrimaryInfos);
133-
134-
// assert a doc from the operations exists.
135-
final ParsedDocument parsedDoc = createParsedDoc(operations.stream().findFirst().get().id(), null);
136-
try (Engine.GetResult getResult = engine.get(newGet(true, parsedDoc), engine::acquireSearcher)) {
137-
assertThat(getResult.exists(), equalTo(true));
138-
assertThat(getResult.docIdAndVersion(), notNullValue());
139-
}
140-
141-
try (Engine.GetResult getResult = nrtEngine.get(newGet(true, parsedDoc), nrtEngine::acquireSearcher)) {
142-
assertThat(getResult.exists(), equalTo(true));
143-
assertThat(getResult.docIdAndVersion(), notNullValue());
144-
}
116+
// assume we start at the same gen.
117+
assertEquals(2, nrtEngine.getLatestSegmentInfos().getGeneration());
118+
assertEquals(nrtEngine.getLatestSegmentInfos().getGeneration(), nrtEngine.getLastCommittedSegmentInfos().getGeneration());
119+
assertEquals(engine.getLatestSegmentInfos().getGeneration(), nrtEngine.getLatestSegmentInfos().getGeneration());
120+
121+
// flush the primary engine - we don't need any segments, just force a new commit point.
122+
engine.flush(true, true);
123+
assertEquals(3, engine.getLatestSegmentInfos().getGeneration());
124+
nrtEngine.updateSegments(engine.getLatestSegmentInfos(), engine.getProcessedLocalCheckpoint());
125+
assertEquals(4, nrtEngine.getLastCommittedSegmentInfos().getGeneration());
126+
assertEquals(4, nrtEngine.getLatestSegmentInfos().getGeneration());
127+
}
128+
}
145129

146-
// Flush the primary and update the NRTEngine with the latest committed infos.
147-
engine.flush();
148-
nrtEngine.translogManager().syncTranslog(); // to advance persisted checkpoint
130+
public void updateSegments_replicaAtLowerCommitGen() throws IOException {
131+
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
149132

150-
Set<Long> seqNos = operations.stream().map(Engine.Operation::seqNo).collect(Collectors.toSet());
133+
try (
134+
final Store nrtEngineStore = createStore(INDEX_SETTINGS, newDirectory());
135+
final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine(globalCheckpoint, nrtEngineStore)
136+
) {
137+
// commit the infos to push us to segments_3.
138+
nrtEngine.commitSegmentInfos();
151139

152-
nrtEngine.ensureOpen();
153-
try (
154-
Translog.Snapshot snapshot = assertAndGetInternalTranslogManager(nrtEngine.translogManager()).getTranslog().newSnapshot()
155-
) {
156-
assertThat(snapshot.totalOperations(), equalTo(operations.size()));
157-
assertThat(
158-
TestTranslog.drainSnapshot(snapshot, false).stream().map(Translog.Operation::seqNo).collect(Collectors.toSet()),
159-
equalTo(seqNos)
160-
);
161-
}
140+
// update the replica with a lower gen.
141+
assertEquals(2, engine.getLatestSegmentInfos().getGeneration());
142+
nrtEngine.updateSegments(engine.getLatestSegmentInfos(), engine.getProcessedLocalCheckpoint());
143+
assertEquals(2, nrtEngine.getLastCommittedSegmentInfos().getGeneration());
144+
assertEquals(2, nrtEngine.getLatestSegmentInfos().getGeneration());
145+
}
146+
}
162147

163-
final SegmentInfos primaryInfos = engine.getLastCommittedSegmentInfos();
164-
nrtEngine.updateSegments(primaryInfos, engine.getProcessedLocalCheckpoint());
165-
assertMatchingSegmentsAndCheckpoints(nrtEngine, primaryInfos);
148+
public void updateSegments_ReplicaAlreadyAtCommitGen() throws IOException {
149+
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
166150

167-
assertEquals(
168-
assertAndGetInternalTranslogManager(nrtEngine.translogManager()).getTranslog().getGeneration().translogFileGeneration,
169-
assertAndGetInternalTranslogManager(engine.translogManager()).getTranslog().getGeneration().translogFileGeneration
170-
);
151+
try (
152+
final Store nrtEngineStore = createStore(INDEX_SETTINGS, newDirectory());
153+
final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine(globalCheckpoint, nrtEngineStore)
154+
) {
155+
// commit the infos to push us to segments_3.
156+
nrtEngine.commitSegmentInfos();
171157

172-
try (
173-
Translog.Snapshot snapshot = assertAndGetInternalTranslogManager(nrtEngine.translogManager()).getTranslog().newSnapshot()
174-
) {
175-
assertThat(snapshot.totalOperations(), equalTo(operations.size()));
176-
assertThat(
177-
TestTranslog.drainSnapshot(snapshot, false).stream().map(Translog.Operation::seqNo).collect(Collectors.toSet()),
178-
equalTo(seqNos)
179-
);
180-
}
158+
// update the replica with a lower gen.
159+
assertEquals(2, engine.getLatestSegmentInfos().getGeneration());
160+
nrtEngine.updateSegments(engine.getLatestSegmentInfos(), engine.getProcessedLocalCheckpoint());
161+
assertEquals(2, nrtEngine.getLastCommittedSegmentInfos().getGeneration());
162+
assertEquals(2, nrtEngine.getLatestSegmentInfos().getGeneration());
181163

182-
// Ensure the same hit count between engines.
183-
int expectedDocCount;
184-
try (final Engine.Searcher test = engine.acquireSearcher("test")) {
185-
expectedDocCount = test.count(Queries.newMatchAllQuery());
186-
assertSearcherHits(nrtEngine, expectedDocCount);
187-
}
188-
assertEngineCleanedUp(nrtEngine, assertAndGetInternalTranslogManager(nrtEngine.translogManager()).getDeletionPolicy());
164+
nrtEngine.close();
165+
assertEquals(3, nrtEngine.getLastCommittedSegmentInfos().getGeneration());
189166
}
190167
}
191168

@@ -227,12 +204,9 @@ public void testCommitSegmentInfos() throws Exception {
227204
// This test asserts that NRTReplication#commitSegmentInfos creates a new commit point with the latest checkpoints
228205
// stored in user data.
229206
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
230-
final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings(
231-
"index",
232-
Settings.builder().put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT).build()
233-
);
207+
234208
try (
235-
final Store nrtEngineStore = createStore(indexSettings, newDirectory());
209+
final Store nrtEngineStore = createStore(INDEX_SETTINGS, newDirectory());
236210
final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine(globalCheckpoint, nrtEngineStore)
237211
) {
238212
List<Engine.Operation> operations = generateHistoryOnReplica(between(1, 500), randomBoolean(), randomBoolean(), randomBoolean())

0 commit comments

Comments
 (0)