Skip to content

Commit 568c193

Browse files
Write shard level metadata blob when snapshotting searchable snapshot indexes (#13190)
* fix snapshot status Signed-off-by: panguixin <[email protected]> * add change log Signed-off-by: panguixin <[email protected]> * Fix spotless violations Signed-off-by: Andrew Ross <[email protected]> --------- Signed-off-by: panguixin <[email protected]> Signed-off-by: Andrew Ross <[email protected]> Co-authored-by: Andrew Ross <[email protected]>
1 parent 3106ea5 commit 568c193

File tree

5 files changed

+93
-84
lines changed

5 files changed

+93
-84
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
3737
- Switch to iterative version of WKT format parser ([#14086](https://github.com/opensearch-project/OpenSearch/pull/14086))
3838
- Fix the computed max shards of cluster to avoid int overflow ([#14155](https://github.com/opensearch-project/OpenSearch/pull/14155))
3939
- Fixed rest-high-level client searchTemplate & mtermVectors endpoints to have a leading slash ([#14465](https://github.com/opensearch-project/OpenSearch/pull/14465))
40+
- Write shard level metadata blob when snapshotting searchable snapshot indexes ([#13190](https://github.com/opensearch-project/OpenSearch/pull/13190))
4041

4142
### Security
4243

server/src/internalClusterTest/java/org/opensearch/snapshots/SearchableSnapshotIT.java

Lines changed: 37 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import org.opensearch.action.admin.cluster.snapshots.delete.DeleteSnapshotRequest;
1818
import org.opensearch.action.admin.cluster.snapshots.get.GetSnapshotsResponse;
1919
import org.opensearch.action.admin.cluster.snapshots.restore.RestoreSnapshotRequest;
20+
import org.opensearch.action.admin.cluster.snapshots.status.SnapshotsStatusResponse;
2021
import org.opensearch.action.admin.indices.settings.get.GetSettingsRequest;
2122
import org.opensearch.action.admin.indices.settings.get.GetSettingsResponse;
2223
import org.opensearch.action.admin.indices.settings.put.UpdateSettingsRequestBuilder;
@@ -53,11 +54,13 @@
5354
import java.nio.file.Files;
5455
import java.nio.file.Path;
5556
import java.util.Arrays;
57+
import java.util.HashMap;
5658
import java.util.List;
5759
import java.util.Map;
5860
import java.util.Set;
5961
import java.util.concurrent.TimeUnit;
6062
import java.util.stream.Collectors;
63+
import java.util.stream.IntStream;
6164
import java.util.stream.Stream;
6265
import java.util.stream.StreamSupport;
6366

@@ -132,21 +135,24 @@ public void testCreateSearchableSnapshot() throws Exception {
132135

133136
public void testSnapshottingSearchableSnapshots() throws Exception {
134137
final String repoName = "test-repo";
138+
final String initSnapName = "initial-snapshot";
135139
final String indexName = "test-idx";
140+
final String repeatSnapNamePrefix = "test-repeated-snap-";
141+
final String repeatIndexNamePrefix = indexName + "-copy-";
136142
final Client client = client();
137143

138144
// create an index, add data, snapshot it, then delete it
139145
internalCluster().ensureAtLeastNumDataNodes(1);
140146
createIndexWithDocsAndEnsureGreen(0, 100, indexName);
141147
createRepositoryWithSettings(null, repoName);
142-
takeSnapshot(client, "initial-snapshot", repoName, indexName);
148+
takeSnapshot(client, initSnapName, repoName, indexName);
143149
deleteIndicesAndEnsureGreen(client, indexName);
144150

145151
// restore the index as a searchable snapshot
146152
internalCluster().ensureAtLeastNumSearchNodes(1);
147153
client.admin()
148154
.cluster()
149-
.prepareRestoreSnapshot(repoName, "initial-snapshot")
155+
.prepareRestoreSnapshot(repoName, initSnapName)
150156
.setRenamePattern("(.+)")
151157
.setRenameReplacement("$1-copy-0")
152158
.setStorageType(RestoreSnapshotRequest.StorageType.REMOTE_SNAPSHOT)
@@ -159,7 +165,7 @@ public void testSnapshottingSearchableSnapshots() throws Exception {
159165

160166
// Test that the searchable snapshot index can continue to be snapshotted and restored
161167
for (int i = 0; i < 4; i++) {
162-
final String repeatedSnapshotName = "test-repeated-snap-" + i;
168+
final String repeatedSnapshotName = repeatSnapNamePrefix + i;
163169
takeSnapshot(client, repeatedSnapshotName, repoName);
164170
deleteIndicesAndEnsureGreen(client, "_all");
165171
client.admin()
@@ -181,21 +187,34 @@ public void testSnapshottingSearchableSnapshots() throws Exception {
181187
final Map<String, List<String>> snapshotInfoMap = response.getSnapshots()
182188
.stream()
183189
.collect(Collectors.toMap(s -> s.snapshotId().getName(), SnapshotInfo::indices));
184-
assertEquals(
185-
Map.of(
186-
"initial-snapshot",
187-
List.of("test-idx"),
188-
"test-repeated-snap-0",
189-
List.of("test-idx-copy-0"),
190-
"test-repeated-snap-1",
191-
List.of("test-idx-copy-1"),
192-
"test-repeated-snap-2",
193-
List.of("test-idx-copy-2"),
194-
"test-repeated-snap-3",
195-
List.of("test-idx-copy-3")
196-
),
197-
snapshotInfoMap
198-
);
190+
final Map<String, List<String>> expect = new HashMap<>();
191+
expect.put(initSnapName, List.of(indexName));
192+
IntStream.range(0, 4).forEach(i -> expect.put(repeatSnapNamePrefix + i, List.of(repeatIndexNamePrefix + i)));
193+
assertEquals(expect, snapshotInfoMap);
194+
195+
String[] snapNames = new String[5];
196+
IntStream.range(0, 4).forEach(i -> snapNames[i] = repeatSnapNamePrefix + i);
197+
snapNames[4] = initSnapName;
198+
SnapshotsStatusResponse snapshotsStatusResponse = client.admin()
199+
.cluster()
200+
.prepareSnapshotStatus(repoName)
201+
.addSnapshots(snapNames)
202+
.execute()
203+
.actionGet();
204+
snapshotsStatusResponse.getSnapshots().forEach(s -> {
205+
String snapName = s.getSnapshot().getSnapshotId().getName();
206+
assertEquals(1, s.getIndices().size());
207+
assertEquals(1, s.getShards().size());
208+
if (snapName.equals("initial-snapshot")) {
209+
assertNotNull(s.getIndices().get("test-idx"));
210+
assertTrue(s.getShards().get(0).getStats().getTotalFileCount() > 0);
211+
} else {
212+
assertTrue(snapName.startsWith(repeatSnapNamePrefix));
213+
assertEquals(1, s.getIndices().size());
214+
assertNotNull(s.getIndices().get(repeatIndexNamePrefix + snapName.substring(repeatSnapNamePrefix.length())));
215+
assertEquals(0L, s.getShards().get(0).getStats().getTotalFileCount());
216+
}
217+
});
199218
}
200219

201220
/**

server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2801,9 +2801,12 @@ public void snapshotShard(
28012801
long indexIncrementalSize = 0;
28022802
long indexTotalFileSize = 0;
28032803
final BlockingQueue<BlobStoreIndexShardSnapshot.FileInfo> filesToSnapshot = new LinkedBlockingQueue<>();
2804-
// If we did not find a set of files that is equal to the current commit we determine the files to upload by comparing files
2805-
// in the commit with files already in the repository
2806-
if (filesFromSegmentInfos == null) {
2804+
if (store.indexSettings().isRemoteSnapshot()) {
2805+
// If the source of the data is another remote snapshot (i.e. searchable snapshot) then no need to snapshot the shard
2806+
indexCommitPointFiles = List.of();
2807+
} else if (filesFromSegmentInfos == null) {
2808+
// If we did not find a set of files that is equal to the current commit we determine the files to upload by comparing files
2809+
// in the commit with files already in the repository
28072810
indexCommitPointFiles = new ArrayList<>();
28082811
final Collection<String> fileNames;
28092812
final Store.MetadataSnapshot metadataFromStore;

server/src/main/java/org/opensearch/snapshots/SnapshotShardsService.java

Lines changed: 37 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -276,53 +276,47 @@ private void startNewShards(SnapshotsInProgress.Entry entry, Map<ShardId, IndexS
276276
final IndexShardSnapshotStatus snapshotStatus = shardEntry.getValue();
277277
final IndexId indexId = indicesMap.get(shardId.getIndexName());
278278
assert indexId != null;
279-
if (isRemoteSnapshot(shardId)) {
280-
// If the source of the data is another remote snapshot (i.e. searchable snapshot)
281-
// then no need to snapshot the shard and can immediately notify success.
282-
notifySuccessfulSnapshotShard(snapshot, shardId, snapshotStatus.generation());
283-
} else {
284-
snapshot(
285-
shardId,
286-
snapshot,
287-
indexId,
288-
entry.userMetadata(),
289-
snapshotStatus,
290-
entry.version(),
291-
entry.remoteStoreIndexShallowCopy(),
292-
new ActionListener<>() {
293-
@Override
294-
public void onResponse(String newGeneration) {
295-
assert newGeneration != null;
296-
assert newGeneration.equals(snapshotStatus.generation());
297-
if (logger.isDebugEnabled()) {
298-
final IndexShardSnapshotStatus.Copy lastSnapshotStatus = snapshotStatus.asCopy();
299-
logger.debug(
300-
"snapshot [{}] completed to [{}] with [{}] at generation [{}]",
301-
snapshot,
302-
snapshot.getRepository(),
303-
lastSnapshotStatus,
304-
snapshotStatus.generation()
305-
);
306-
}
307-
notifySuccessfulSnapshotShard(snapshot, shardId, newGeneration);
279+
snapshot(
280+
shardId,
281+
snapshot,
282+
indexId,
283+
entry.userMetadata(),
284+
snapshotStatus,
285+
entry.version(),
286+
entry.remoteStoreIndexShallowCopy(),
287+
new ActionListener<>() {
288+
@Override
289+
public void onResponse(String newGeneration) {
290+
assert newGeneration != null;
291+
assert newGeneration.equals(snapshotStatus.generation());
292+
if (logger.isDebugEnabled()) {
293+
final IndexShardSnapshotStatus.Copy lastSnapshotStatus = snapshotStatus.asCopy();
294+
logger.debug(
295+
"snapshot [{}] completed to [{}] with [{}] at generation [{}]",
296+
snapshot,
297+
snapshot.getRepository(),
298+
lastSnapshotStatus,
299+
snapshotStatus.generation()
300+
);
308301
}
302+
notifySuccessfulSnapshotShard(snapshot, shardId, newGeneration);
303+
}
309304

310-
@Override
311-
public void onFailure(Exception e) {
312-
final String failure;
313-
if (e instanceof AbortedSnapshotException) {
314-
failure = "aborted";
315-
logger.debug(() -> new ParameterizedMessage("[{}][{}] aborted shard snapshot", shardId, snapshot), e);
316-
} else {
317-
failure = summarizeFailure(e);
318-
logger.warn(() -> new ParameterizedMessage("[{}][{}] failed to snapshot shard", shardId, snapshot), e);
319-
}
320-
snapshotStatus.moveToFailed(threadPool.absoluteTimeInMillis(), failure);
321-
notifyFailedSnapshotShard(snapshot, shardId, failure);
305+
@Override
306+
public void onFailure(Exception e) {
307+
final String failure;
308+
if (e instanceof AbortedSnapshotException) {
309+
failure = "aborted";
310+
logger.debug(() -> new ParameterizedMessage("[{}][{}] aborted shard snapshot", shardId, snapshot), e);
311+
} else {
312+
failure = summarizeFailure(e);
313+
logger.warn(() -> new ParameterizedMessage("[{}][{}] failed to snapshot shard", shardId, snapshot), e);
322314
}
315+
snapshotStatus.moveToFailed(threadPool.absoluteTimeInMillis(), failure);
316+
notifyFailedSnapshotShard(snapshot, shardId, failure);
323317
}
324-
);
325-
}
318+
}
319+
);
326320
}
327321
});
328322
}

test/framework/src/main/java/org/opensearch/repositories/blobstore/BlobStoreTestUtil.java

Lines changed: 12 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,6 @@
9090

9191
import static org.opensearch.test.OpenSearchTestCase.buildNewFakeTransportAddress;
9292
import static org.opensearch.test.OpenSearchTestCase.randomIntBetween;
93-
import static org.hamcrest.Matchers.anEmptyMap;
9493
import static org.hamcrest.Matchers.containsInAnyOrder;
9594
import static org.hamcrest.Matchers.empty;
9695
import static org.hamcrest.Matchers.hasKey;
@@ -143,7 +142,7 @@ public static void assertConsistency(BlobStoreRepository repository, Executor ex
143142
}
144143
assertIndexUUIDs(repository, repositoryData);
145144
assertSnapshotUUIDs(repository, repositoryData);
146-
assertShardIndexGenerations(repository, blobContainer, repositoryData);
145+
assertShardIndexGenerations(blobContainer, repositoryData);
147146
return null;
148147
} catch (AssertionError e) {
149148
return e;
@@ -167,31 +166,24 @@ private static void assertIndexGenerations(BlobContainer repoRoot, long latestGe
167166
assertTrue(indexGenerations.length <= 2);
168167
}
169168

170-
private static void assertShardIndexGenerations(BlobStoreRepository repository, BlobContainer repoRoot, RepositoryData repositoryData)
171-
throws IOException {
169+
private static void assertShardIndexGenerations(BlobContainer repoRoot, RepositoryData repositoryData) throws IOException {
172170
final ShardGenerations shardGenerations = repositoryData.shardGenerations();
173171
final BlobContainer indicesContainer = repoRoot.children().get("indices");
174172
for (IndexId index : shardGenerations.indices()) {
175173
final List<String> gens = shardGenerations.getGens(index);
176174
if (gens.isEmpty() == false) {
177175
final BlobContainer indexContainer = indicesContainer.children().get(index.getId());
178176
final Map<String, BlobContainer> shardContainers = indexContainer.children();
179-
if (isRemoteSnapshot(repository, repositoryData, index)) {
180-
// If the source of the data is another snapshot (i.e. searchable snapshot)
181-
// then assert that there is no shard data (because it exists in the source snapshot)
182-
assertThat(shardContainers, anEmptyMap());
183-
} else {
184-
for (int i = 0; i < gens.size(); i++) {
185-
final String generation = gens.get(i);
186-
assertThat(generation, not(ShardGenerations.DELETED_SHARD_GEN));
187-
if (generation != null && generation.equals(ShardGenerations.NEW_SHARD_GEN) == false) {
188-
final String shardId = Integer.toString(i);
189-
assertThat(shardContainers, hasKey(shardId));
190-
assertThat(
191-
shardContainers.get(shardId).listBlobsByPrefix(BlobStoreRepository.INDEX_FILE_PREFIX),
192-
hasKey(BlobStoreRepository.INDEX_FILE_PREFIX + generation)
193-
);
194-
}
177+
for (int i = 0; i < gens.size(); i++) {
178+
final String generation = gens.get(i);
179+
assertThat(generation, not(ShardGenerations.DELETED_SHARD_GEN));
180+
if (generation != null && generation.equals(ShardGenerations.NEW_SHARD_GEN) == false) {
181+
final String shardId = Integer.toString(i);
182+
assertThat(shardContainers, hasKey(shardId));
183+
assertThat(
184+
shardContainers.get(shardId).listBlobsByPrefix(BlobStoreRepository.INDEX_FILE_PREFIX),
185+
hasKey(BlobStoreRepository.INDEX_FILE_PREFIX + generation)
186+
);
195187
}
196188
}
197189
}

0 commit comments

Comments
 (0)