Skip to content

Commit cdf5e1a

Browse files
authored
[Backport 2.x] Segment Replication - Fix ShardLockObtained error during corruption cases #10370 (#10418)
* Segment Replication - Fix ShardLockObtained error during corruption cases (#10370) * Segment Replication - Fix ShardLockObtained error during corruption cases This change fixes a bug where shards could not be recreated locally after corruption. This occured because the store was not decref'd to 0 if the commit on close would fail with a corruption exception. Signed-off-by: Marc Handalian <[email protected]> * Remove exra logs Signed-off-by: Marc Handalian <[email protected]> * Remove flaky assertion on store refcount Signed-off-by: Marc Handalian <[email protected]> * Remove flaky test. Signed-off-by: Marc Handalian <[email protected]> * PR Feedback. Remove hacky handling of corruption when fetching metadata. This will now check for store corruption when replication has failed and fail the shard accordingly. This commit also fixes logging in NRTReplicationEngine. Signed-off-by: Marc Handalian <[email protected]> * Fix unit test. Signed-off-by: Marc Handalian <[email protected]> * Fix test failure testSegRepSucceedsOnPreviousCopiedFiles. This test broke because we invoked target.indexShard on a closed replicationTarget. In these cases we can assume the store is not corrupt. Signed-off-by: Marc Handalian <[email protected]> * spotless Signed-off-by: Marc Handalian <[email protected]> * Revert flaky IT Signed-off-by: Marc Handalian <[email protected]> * Fix flakiness failure by expecting RTE when check index fails. Signed-off-by: Marc Handalian <[email protected]> * reintroduce ITs and use recoveries API instead of waiting on shard state. Signed-off-by: Marc Handalian <[email protected]> * Fix edge case where flush failures would not get reported as corruption. Signed-off-by: Marc Handalian <[email protected]> --------- Signed-off-by: Marc Handalian <[email protected]> * Fix breaking change only on main. Signed-off-by: Marc Handalian <[email protected]> --------- Signed-off-by: Marc Handalian <[email protected]>
1 parent fea095d commit cdf5e1a

File tree

10 files changed

+245
-18
lines changed

10 files changed

+245
-18
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
6363
- Fix remove ingest processor handing ignore_missing parameter not correctly ([10089](https://github.com/opensearch-project/OpenSearch/pull/10089))
6464
- Fix registration and initialization of multiple extensions ([10256](https://github.com/opensearch-project/OpenSearch/pull/10256))
6565
- Fix circular dependency in Settings initialization ([10194](https://github.com/opensearch-project/OpenSearch/pull/10194))
66+
- Fix Segment Replication ShardLockObtainFailedException bug during index corruption ([10370](https://github.com/opensearch-project/OpenSearch/pull/10370))
6667

6768
### Security
6869

server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationBaseIT.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -197,9 +197,10 @@ protected IndexShard getIndexShard(String node, ShardId shardId, String indexNam
197197
protected IndexShard getIndexShard(String node, String indexName) {
198198
final Index index = resolveIndex(indexName);
199199
IndicesService indicesService = internalCluster().getInstance(IndicesService.class, node);
200-
IndexService indexService = indicesService.indexServiceSafe(index);
200+
IndexService indexService = indicesService.indexService(index);
201+
assertNotNull(indexService);
201202
final Optional<Integer> shardId = indexService.shardIds().stream().findFirst();
202-
return indexService.getShard(shardId.get());
203+
return shardId.map(indexService::getShard).orElse(null);
203204
}
204205

205206
protected boolean segmentReplicationWithRemoteEnabled() {

server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java

Lines changed: 135 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.apache.lucene.util.BytesRef;
2525
import org.opensearch.action.admin.indices.alias.Alias;
2626
import org.opensearch.action.admin.indices.flush.FlushRequest;
27+
import org.opensearch.action.admin.indices.recovery.RecoveryResponse;
2728
import org.opensearch.action.admin.indices.stats.IndicesStatsRequest;
2829
import org.opensearch.action.admin.indices.stats.IndicesStatsResponse;
2930
import org.opensearch.action.get.GetResponse;
@@ -58,6 +59,7 @@
5859
import org.opensearch.common.lucene.index.OpenSearchDirectoryReader;
5960
import org.opensearch.common.settings.Settings;
6061
import org.opensearch.common.unit.TimeValue;
62+
import org.opensearch.core.common.bytes.BytesArray;
6163
import org.opensearch.core.common.io.stream.NamedWriteableRegistry;
6264
import org.opensearch.core.index.shard.ShardId;
6365
import org.opensearch.core.xcontent.XContentBuilder;
@@ -71,6 +73,7 @@
7173
import org.opensearch.index.engine.NRTReplicationReaderManager;
7274
import org.opensearch.index.shard.IndexShard;
7375
import org.opensearch.indices.recovery.FileChunkRequest;
76+
import org.opensearch.indices.recovery.RecoveryState;
7477
import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;
7578
import org.opensearch.indices.replication.common.ReplicationType;
7679
import org.opensearch.node.NodeClosedException;
@@ -82,6 +85,7 @@
8285
import org.opensearch.test.InternalTestCluster;
8386
import org.opensearch.test.OpenSearchIntegTestCase;
8487
import org.opensearch.test.transport.MockTransportService;
88+
import org.opensearch.transport.TransportRequest;
8589
import org.opensearch.transport.TransportService;
8690
import org.junit.Before;
8791

@@ -94,6 +98,7 @@
9498
import java.util.Set;
9599
import java.util.concurrent.CountDownLatch;
96100
import java.util.concurrent.TimeUnit;
101+
import java.util.concurrent.atomic.AtomicBoolean;
97102
import java.util.stream.Collectors;
98103

99104
import static java.util.Arrays.asList;
@@ -1777,4 +1782,134 @@ public void testRealtimeTermVectorRequestsUnSuccessful() throws IOException {
17771782

17781783
}
17791784

1785+
public void testSendCorruptBytesToReplica() throws Exception {
1786+
// this test stubs transport calls specific to node-node replication.
1787+
assumeFalse(
1788+
"Skipping the test as its not compatible with segment replication with remote store.",
1789+
segmentReplicationWithRemoteEnabled()
1790+
);
1791+
final String primaryNode = internalCluster().startDataOnlyNode();
1792+
createIndex(
1793+
INDEX_NAME,
1794+
Settings.builder()
1795+
.put(indexSettings())
1796+
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
1797+
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)
1798+
.put("index.refresh_interval", -1)
1799+
.build()
1800+
);
1801+
ensureYellow(INDEX_NAME);
1802+
final String replicaNode = internalCluster().startDataOnlyNode();
1803+
ensureGreen(INDEX_NAME);
1804+
1805+
MockTransportService primaryTransportService = ((MockTransportService) internalCluster().getInstance(
1806+
TransportService.class,
1807+
primaryNode
1808+
));
1809+
CountDownLatch latch = new CountDownLatch(1);
1810+
AtomicBoolean failed = new AtomicBoolean(false);
1811+
primaryTransportService.addSendBehavior(
1812+
internalCluster().getInstance(TransportService.class, replicaNode),
1813+
(connection, requestId, action, request, options) -> {
1814+
if (action.equals(SegmentReplicationTargetService.Actions.FILE_CHUNK) && failed.getAndSet(true) == false) {
1815+
FileChunkRequest req = (FileChunkRequest) request;
1816+
logger.info("SENDING CORRUPT file chunk [{}] lastChunk: {}", req, req.lastChunk());
1817+
TransportRequest corrupt = new FileChunkRequest(
1818+
req.recoveryId(),
1819+
((FileChunkRequest) request).requestSeqNo(),
1820+
((FileChunkRequest) request).shardId(),
1821+
((FileChunkRequest) request).metadata(),
1822+
((FileChunkRequest) request).position(),
1823+
new BytesArray("test"),
1824+
false,
1825+
0,
1826+
0L
1827+
);
1828+
connection.sendRequest(requestId, action, corrupt, options);
1829+
latch.countDown();
1830+
} else {
1831+
connection.sendRequest(requestId, action, request, options);
1832+
}
1833+
}
1834+
);
1835+
for (int i = 0; i < 100; i++) {
1836+
client().prepareIndex(INDEX_NAME)
1837+
.setId(String.valueOf(i))
1838+
.setSource(jsonBuilder().startObject().field("field", i).endObject())
1839+
.get();
1840+
}
1841+
final long originalRecoveryTime = getRecoveryStopTime(replicaNode);
1842+
assertNotEquals(originalRecoveryTime, 0);
1843+
refresh(INDEX_NAME);
1844+
latch.await();
1845+
assertTrue(failed.get());
1846+
waitForNewPeerRecovery(replicaNode, originalRecoveryTime);
1847+
// reset checkIndex to ensure our original shard doesn't throw
1848+
resetCheckIndexStatus();
1849+
waitForSearchableDocs(100, primaryNode, replicaNode);
1850+
}
1851+
1852+
public void testWipeSegmentBetweenSyncs() throws Exception {
1853+
internalCluster().startClusterManagerOnlyNode();
1854+
final String primaryNode = internalCluster().startDataOnlyNode();
1855+
createIndex(
1856+
INDEX_NAME,
1857+
Settings.builder()
1858+
.put(indexSettings())
1859+
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
1860+
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)
1861+
.put("index.refresh_interval", -1)
1862+
.build()
1863+
);
1864+
ensureYellow(INDEX_NAME);
1865+
final String replicaNode = internalCluster().startDataOnlyNode();
1866+
ensureGreen(INDEX_NAME);
1867+
1868+
for (int i = 0; i < 10; i++) {
1869+
client().prepareIndex(INDEX_NAME)
1870+
.setId(String.valueOf(i))
1871+
.setSource(jsonBuilder().startObject().field("field", i).endObject())
1872+
.get();
1873+
}
1874+
refresh(INDEX_NAME);
1875+
ensureGreen(INDEX_NAME);
1876+
final long originalRecoveryTime = getRecoveryStopTime(replicaNode);
1877+
1878+
final IndexShard indexShard = getIndexShard(replicaNode, INDEX_NAME);
1879+
waitForSearchableDocs(INDEX_NAME, 10, List.of(replicaNode));
1880+
indexShard.store().directory().deleteFile("_0.si");
1881+
1882+
for (int i = 11; i < 21; i++) {
1883+
client().prepareIndex(INDEX_NAME)
1884+
.setId(String.valueOf(i))
1885+
.setSource(jsonBuilder().startObject().field("field", i).endObject())
1886+
.get();
1887+
}
1888+
refresh(INDEX_NAME);
1889+
waitForNewPeerRecovery(replicaNode, originalRecoveryTime);
1890+
resetCheckIndexStatus();
1891+
waitForSearchableDocs(20, primaryNode, replicaNode);
1892+
}
1893+
1894+
private void waitForNewPeerRecovery(String replicaNode, long originalRecoveryTime) throws Exception {
1895+
assertBusy(() -> {
1896+
// assert we have a peer recovery after the original
1897+
final long time = getRecoveryStopTime(replicaNode);
1898+
assertNotEquals(time, 0);
1899+
assertNotEquals(originalRecoveryTime, time);
1900+
1901+
}, 1, TimeUnit.MINUTES);
1902+
}
1903+
1904+
private long getRecoveryStopTime(String nodeName) {
1905+
final RecoveryResponse recoveryResponse = client().admin().indices().prepareRecoveries(INDEX_NAME).get();
1906+
final List<RecoveryState> recoveryStates = recoveryResponse.shardRecoveryStates().get(INDEX_NAME);
1907+
logger.info("Recovery states {}", recoveryResponse);
1908+
for (RecoveryState recoveryState : recoveryStates) {
1909+
if (recoveryState.getTargetNode().getName().equals(nodeName)) {
1910+
return recoveryState.getTimer().stopTime();
1911+
}
1912+
}
1913+
return 0L;
1914+
}
17801915
}

server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -416,6 +416,7 @@ public void flush(boolean force, boolean waitIfOngoing) throws EngineException {
416416
try {
417417
commitSegmentInfos();
418418
} catch (IOException e) {
419+
maybeFailEngine("flush", e);
419420
throw new FlushFailedEngineException(shardId, e);
420421
} finally {
421422
flushLock.unlock();
@@ -489,13 +490,29 @@ protected final void closeNoLock(String reason, CountDownLatch closedLatch) {
489490
latestSegmentInfos.counter = latestSegmentInfos.counter + SI_COUNTER_INCREMENT;
490491
latestSegmentInfos.changed();
491492
}
492-
commitSegmentInfos(latestSegmentInfos);
493-
IOUtils.close(readerManager, translogManager, store::decRef);
493+
try {
494+
commitSegmentInfos(latestSegmentInfos);
495+
} catch (IOException e) {
496+
// mark the store corrupted unless we are closing as result of engine failure.
497+
// in this case Engine#failShard will handle store corruption.
498+
if (failEngineLock.isHeldByCurrentThread() == false && store.isMarkedCorrupted() == false) {
499+
try {
500+
store.markStoreCorrupted(e);
501+
} catch (IOException ex) {
502+
logger.warn("Unable to mark store corrupted", ex);
503+
}
504+
}
505+
}
506+
IOUtils.close(readerManager, translogManager);
494507
} catch (Exception e) {
495-
logger.warn("failed to close engine", e);
508+
logger.error("failed to close engine", e);
496509
} finally {
497-
logger.debug("engine closed [{}]", reason);
498-
closedLatch.countDown();
510+
try {
511+
store.decRef();
512+
logger.debug("engine closed [{}]", reason);
513+
} finally {
514+
closedLatch.countDown();
515+
}
499516
}
500517
}
501518
}

server/src/main/java/org/opensearch/index/store/Store.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -385,7 +385,13 @@ public MetadataSnapshot getMetadata(SegmentInfos segmentInfos) throws IOExceptio
385385
*/
386386
public Map<String, StoreFileMetadata> getSegmentMetadataMap(SegmentInfos segmentInfos) throws IOException {
387387
assert indexSettings.isSegRepEnabled();
388-
return loadMetadata(segmentInfos, directory, logger, true).fileMetadata;
388+
failIfCorrupted();
389+
try {
390+
return loadMetadata(segmentInfos, directory, logger, true).fileMetadata;
391+
} catch (NoSuchFileException | CorruptIndexException | IndexFormatTooOldException | IndexFormatTooNewException ex) {
392+
markStoreCorrupted(ex);
393+
throw ex;
394+
}
389395
}
390396

391397
/**

server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
import org.apache.lucene.store.IOContext;
1919
import org.apache.lucene.store.IndexInput;
2020
import org.opensearch.OpenSearchCorruptionException;
21-
import org.opensearch.OpenSearchException;
2221
import org.opensearch.action.StepListener;
2322
import org.opensearch.common.UUIDs;
2423
import org.opensearch.common.lucene.Lucene;
@@ -261,9 +260,7 @@ private void finalizeReplication(CheckpointInfoResponse checkpointInfoResponse)
261260
} catch (CorruptIndexException | IndexFormatTooNewException | IndexFormatTooOldException ex) {
262261
// this is a fatal exception at this stage.
263262
// this means we transferred files from the remote that have not be checksummed and they are
264-
// broken. We have to clean up this shard entirely, remove all files and bubble it up to the
265-
// source shard since this index might be broken there as well? The Source can handle this and checks
266-
// its content on disk if possible.
263+
// broken. We have to clean up this shard entirely, remove all files and bubble it up.
267264
try {
268265
try {
269266
store.removeCorruptionMarker();
@@ -279,14 +276,14 @@ private void finalizeReplication(CheckpointInfoResponse checkpointInfoResponse)
279276
// In this case the shard is closed at some point while updating the reader.
280277
// This can happen when the engine is closed in a separate thread.
281278
logger.warn("Shard is already closed, closing replication");
282-
} catch (OpenSearchException ex) {
279+
} catch (CancellableThreads.ExecutionCancelledException ex) {
283280
/*
284281
Ignore closed replication target as it can happen due to index shard closed event in a separate thread.
285282
In such scenario, ignore the exception
286283
*/
287-
assert cancellableThreads.isCancelled() : "Replication target closed but segment replication not cancelled";
284+
assert cancellableThreads.isCancelled() : "Replication target cancelled but cancellable threads not cancelled";
288285
} catch (Exception ex) {
289-
throw new OpenSearchCorruptionException(ex);
286+
throw new ReplicationFailedException(ex);
290287
} finally {
291288
if (store != null) {
292289
store.decRef();

server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import org.apache.logging.log4j.LogManager;
1212
import org.apache.logging.log4j.Logger;
1313
import org.apache.logging.log4j.message.ParameterizedMessage;
14+
import org.apache.lucene.index.CorruptIndexException;
1415
import org.opensearch.ExceptionsHelper;
1516
import org.opensearch.OpenSearchCorruptionException;
1617
import org.opensearch.action.support.ChannelActionListener;
@@ -28,6 +29,7 @@
2829
import org.opensearch.index.shard.IndexEventListener;
2930
import org.opensearch.index.shard.IndexShard;
3031
import org.opensearch.index.shard.IndexShardState;
32+
import org.opensearch.index.store.Store;
3133
import org.opensearch.indices.IndicesService;
3234
import org.opensearch.indices.recovery.FileChunkRequest;
3335
import org.opensearch.indices.recovery.ForceSyncRequest;
@@ -46,6 +48,7 @@
4648
import org.opensearch.transport.TransportRequestOptions;
4749
import org.opensearch.transport.TransportService;
4850

51+
import java.io.IOException;
4952
import java.util.Map;
5053
import java.util.Optional;
5154
import java.util.concurrent.atomic.AtomicLong;
@@ -522,7 +525,7 @@ public void onResponse(Void o) {
522525
@Override
523526
public void onFailure(Exception e) {
524527
logger.debug("Replication failed {}", target.description());
525-
if (e instanceof OpenSearchCorruptionException) {
528+
if (isStoreCorrupt(target) || e instanceof CorruptIndexException || e instanceof OpenSearchCorruptionException) {
526529
onGoingReplications.fail(replicationId, new ReplicationFailedException("Store corruption during replication", e), true);
527530
return;
528531
}
@@ -531,6 +534,27 @@ public void onFailure(Exception e) {
531534
});
532535
}
533536

537+
private boolean isStoreCorrupt(SegmentReplicationTarget target) {
538+
// ensure target is not already closed. In that case
539+
// we can assume the store is not corrupt and that the replication
540+
// event completed successfully.
541+
if (target.refCount() > 0) {
542+
final Store store = target.store();
543+
if (store.tryIncRef()) {
544+
try {
545+
return store.isMarkedCorrupted();
546+
} catch (IOException ex) {
547+
logger.warn("Unable to determine if store is corrupt", ex);
548+
return false;
549+
} finally {
550+
store.decRef();
551+
}
552+
}
553+
}
554+
// store already closed.
555+
return false;
556+
}
557+
534558
private class FileChunkTransportRequestHandler implements TransportRequestHandler<FileChunkRequest> {
535559

536560
// How many bytes we've copied since we last called RateLimiter.pause

0 commit comments

Comments
 (0)