Skip to content

Commit 14eb14a

Browse files
sachinpkaleSachin Kale
andcommitted
[Remote Store] Upload segments to remote store post refresh (opensearch-project#3460)
* Add RemoteDirectory interface to copy segment files to/from remote store Signed-off-by: Sachin Kale <[email protected]> Co-authored-by: Sachin Kale <[email protected]> * Add index level setting for remote store Signed-off-by: Sachin Kale <[email protected]> Co-authored-by: Sachin Kale <[email protected]> * Add RemoteDirectoryFactory and use RemoteDirectory instance in RefreshListener Co-authored-by: Sachin Kale <[email protected]> Signed-off-by: Sachin Kale <[email protected]> * Upload segment to remote store post refresh Signed-off-by: Sachin Kale <[email protected]> Co-authored-by: Sachin Kale <[email protected]> Signed-off-by: Sachin Kale <[email protected]>
1 parent f98340e commit 14eb14a

24 files changed

+1183
-9
lines changed

server/src/internalClusterTest/java/org/opensearch/index/shard/IndexShardIT.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -675,7 +675,8 @@ public static final IndexShard newIndexShard(
675675
() -> {},
676676
RetentionLeaseSyncer.EMPTY,
677677
cbs,
678-
SegmentReplicationCheckpointPublisher.EMPTY
678+
SegmentReplicationCheckpointPublisher.EMPTY,
679+
null
679680
);
680681
}
681682

server/src/main/java/org/opensearch/cluster/metadata/IndexMetadata.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -283,6 +283,17 @@ public Iterator<Setting<?>> settings() {
283283
Property.Final
284284
);
285285

286+
public static final String SETTING_REMOTE_STORE = "index.remote_store";
287+
/**
288+
* Used to specify if the index data should be persisted in the remote store.
289+
*/
290+
public static final Setting<Boolean> INDEX_REMOTE_STORE_SETTING = Setting.boolSetting(
291+
SETTING_REMOTE_STORE,
292+
false,
293+
Property.IndexScope,
294+
Property.Final
295+
);
296+
286297
public static final String SETTING_AUTO_EXPAND_REPLICAS = "index.auto_expand_replicas";
287298
public static final Setting<AutoExpandReplicas> INDEX_AUTO_EXPAND_REPLICAS_SETTING = AutoExpandReplicas.SETTING;
288299

server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -219,7 +219,9 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
219219
*/
220220
public static final Map<String, Setting> FEATURE_FLAGGED_INDEX_SETTINGS = Map.of(
221221
FeatureFlags.REPLICATION_TYPE,
222-
IndexMetadata.INDEX_REPLICATION_TYPE_SETTING
222+
IndexMetadata.INDEX_REPLICATION_TYPE_SETTING,
223+
FeatureFlags.REMOTE_STORE,
224+
IndexMetadata.INDEX_REMOTE_STORE_SETTING
223225
);
224226

225227
public static final IndexScopedSettings DEFAULT_SCOPED_SETTINGS = new IndexScopedSettings(Settings.EMPTY, BUILT_IN_INDEX_SETTINGS);

server/src/main/java/org/opensearch/common/util/FeatureFlags.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,12 @@ public class FeatureFlags {
2323
*/
2424
public static final String REPLICATION_TYPE = "opensearch.experimental.feature.replication_type.enabled";
2525

26+
/**
27+
* Gates the visibility of the index setting that allows persisting data to remote store along with local disk.
28+
* Once the feature is ready for production release, this feature flag can be removed.
29+
*/
30+
public static final String REMOTE_STORE = "opensearch.experimental.feature.remote_store.enabled";
31+
2632
/**
2733
* Used to test feature flags whose values are expected to be booleans.
2834
* This method returns true if the value is "true" (case-insensitive),

server/src/main/java/org/opensearch/index/IndexModule.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@
7070
import org.opensearch.index.shard.SearchOperationListener;
7171
import org.opensearch.index.similarity.SimilarityService;
7272
import org.opensearch.index.store.FsDirectoryFactory;
73+
import org.opensearch.index.store.RemoteDirectoryFactory;
7374
import org.opensearch.indices.IndicesQueryCache;
7475
import org.opensearch.indices.breaker.CircuitBreakerService;
7576
import org.opensearch.indices.fielddata.cache.IndicesFieldDataCache;
@@ -118,6 +119,8 @@ public final class IndexModule {
118119

119120
private static final FsDirectoryFactory DEFAULT_DIRECTORY_FACTORY = new FsDirectoryFactory();
120121

122+
private static final RemoteDirectoryFactory REMOTE_DIRECTORY_FACTORY = new RemoteDirectoryFactory();
123+
121124
private static final IndexStorePlugin.RecoveryStateFactory DEFAULT_RECOVERY_STATE_FACTORY = RecoveryState::new;
122125

123126
public static final Setting<String> INDEX_STORE_TYPE_SETTING = new Setting<>(
@@ -528,6 +531,7 @@ public IndexService newIndexService(
528531
client,
529532
queryCache,
530533
directoryFactory,
534+
REMOTE_DIRECTORY_FACTORY,
531535
eventListener,
532536
readerWrapperFactory,
533537
mapperRegistry,

server/src/main/java/org/opensearch/index/IndexService.java

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@
8181
import org.opensearch.index.shard.IndexShard;
8282
import org.opensearch.index.shard.IndexShardClosedException;
8383
import org.opensearch.index.shard.IndexingOperationListener;
84+
import org.opensearch.index.shard.RemoteStoreRefreshListener;
8485
import org.opensearch.index.shard.SearchOperationListener;
8586
import org.opensearch.index.shard.ShardId;
8687
import org.opensearch.index.shard.ShardNotFoundException;
@@ -96,6 +97,9 @@
9697
import org.opensearch.indices.recovery.RecoveryState;
9798
import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher;
9899
import org.opensearch.plugins.IndexStorePlugin;
100+
import org.opensearch.repositories.RepositoriesService;
101+
import org.opensearch.repositories.Repository;
102+
import org.opensearch.repositories.RepositoryMissingException;
99103
import org.opensearch.script.ScriptService;
100104
import org.opensearch.search.aggregations.support.ValuesSourceRegistry;
101105
import org.opensearch.threadpool.ThreadPool;
@@ -136,6 +140,7 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
136140
private final NodeEnvironment nodeEnv;
137141
private final ShardStoreDeleter shardStoreDeleter;
138142
private final IndexStorePlugin.DirectoryFactory directoryFactory;
143+
private final IndexStorePlugin.RemoteDirectoryFactory remoteDirectoryFactory;
139144
private final IndexStorePlugin.RecoveryStateFactory recoveryStateFactory;
140145
private final CheckedFunction<DirectoryReader, DirectoryReader, IOException> readerWrapper;
141146
private final IndexCache indexCache;
@@ -190,6 +195,7 @@ public IndexService(
190195
Client client,
191196
QueryCache queryCache,
192197
IndexStorePlugin.DirectoryFactory directoryFactory,
198+
IndexStorePlugin.RemoteDirectoryFactory remoteDirectoryFactory,
193199
IndexEventListener eventListener,
194200
Function<IndexService, CheckedFunction<DirectoryReader, DirectoryReader, IOException>> wrapperFactory,
195201
MapperRegistry mapperRegistry,
@@ -260,6 +266,7 @@ public IndexService(
260266
this.eventListener = eventListener;
261267
this.nodeEnv = nodeEnv;
262268
this.directoryFactory = directoryFactory;
269+
this.remoteDirectoryFactory = remoteDirectoryFactory;
263270
this.recoveryStateFactory = recoveryStateFactory;
264271
this.engineFactory = Objects.requireNonNull(engineFactory);
265272
this.engineConfigFactory = Objects.requireNonNull(engineConfigFactory);
@@ -430,7 +437,8 @@ public synchronized IndexShard createShard(
430437
final ShardRouting routing,
431438
final Consumer<ShardId> globalCheckpointSyncer,
432439
final RetentionLeaseSyncer retentionLeaseSyncer,
433-
final SegmentReplicationCheckpointPublisher checkpointPublisher
440+
final SegmentReplicationCheckpointPublisher checkpointPublisher,
441+
final RepositoriesService repositoriesService
434442
) throws IOException {
435443
Objects.requireNonNull(retentionLeaseSyncer);
436444
/*
@@ -504,6 +512,21 @@ public synchronized IndexShard createShard(
504512
}
505513
};
506514
Directory directory = directoryFactory.newDirectory(this.indexSettings, path);
515+
Directory remoteDirectory = null;
516+
RemoteStoreRefreshListener remoteStoreRefreshListener = null;
517+
if (this.indexSettings.isRemoteStoreEnabled()) {
518+
try {
519+
Repository repository = repositoriesService.repository(clusterService.state().metadata().clusterUUID());
520+
remoteDirectory = remoteDirectoryFactory.newDirectory(this.indexSettings, path, repository);
521+
remoteStoreRefreshListener = new RemoteStoreRefreshListener(directory, remoteDirectory);
522+
} catch (RepositoryMissingException e) {
523+
throw new IllegalArgumentException(
524+
"Repository should be created before creating index with remote_store enabled setting",
525+
e
526+
);
527+
}
528+
}
529+
507530
store = new Store(
508531
shardId,
509532
this.indexSettings,
@@ -533,7 +556,8 @@ public synchronized IndexShard createShard(
533556
() -> globalCheckpointSyncer.accept(shardId),
534557
retentionLeaseSyncer,
535558
circuitBreakerService,
536-
this.indexSettings.isSegRepEnabled() ? checkpointPublisher : null
559+
this.indexSettings.isSegRepEnabled() ? checkpointPublisher : null,
560+
remoteStoreRefreshListener
537561
);
538562
eventListener.indexShardStateChanged(indexShard, null, indexShard.state(), "shard created");
539563
eventListener.afterIndexShardCreated(indexShard);

server/src/main/java/org/opensearch/index/IndexSettings.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -547,6 +547,7 @@ public final class IndexSettings {
547547
private final Settings nodeSettings;
548548
private final int numberOfShards;
549549
private final ReplicationType replicationType;
550+
private final boolean isRemoteStoreEnabled;
550551
// volatile fields are updated via #updateIndexMetadata(IndexMetadata) under lock
551552
private volatile Settings settings;
552553
private volatile IndexMetadata indexMetadata;
@@ -703,6 +704,7 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti
703704
this.indexMetadata = indexMetadata;
704705
numberOfShards = settings.getAsInt(IndexMetadata.SETTING_NUMBER_OF_SHARDS, null);
705706
replicationType = ReplicationType.parseString(settings.get(IndexMetadata.SETTING_REPLICATION_TYPE));
707+
isRemoteStoreEnabled = settings.getAsBoolean(IndexMetadata.SETTING_REMOTE_STORE, false);
706708

707709
this.searchThrottled = INDEX_SEARCH_THROTTLED.get(settings);
708710
this.queryStringLenient = QUERY_STRING_LENIENT_SETTING.get(settings);
@@ -946,6 +948,13 @@ public boolean isSegRepEnabled() {
946948
return ReplicationType.SEGMENT.equals(replicationType);
947949
}
948950

951+
/**
952+
* Returns if remote store is enabled for this index.
953+
*/
954+
public boolean isRemoteStoreEnabled() {
955+
return isRemoteStoreEnabled;
956+
}
957+
949958
/**
950959
* Returns the node settings. The settings returned from {@link #getSettings()} are a merged version of the
951960
* index settings and the node settings where node settings are overwritten by index settings.

server/src/main/java/org/opensearch/index/shard/IndexShard.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -305,6 +305,8 @@ Runnable getGlobalCheckpointSyncer() {
305305
private final RefreshPendingLocationListener refreshPendingLocationListener;
306306
private volatile boolean useRetentionLeasesInPeerRecovery;
307307

308+
private final RemoteStoreRefreshListener remoteStoreRefreshListener;
309+
308310
public IndexShard(
309311
final ShardRouting shardRouting,
310312
final IndexSettings indexSettings,
@@ -326,7 +328,8 @@ public IndexShard(
326328
final Runnable globalCheckpointSyncer,
327329
final RetentionLeaseSyncer retentionLeaseSyncer,
328330
final CircuitBreakerService circuitBreakerService,
329-
@Nullable final SegmentReplicationCheckpointPublisher checkpointPublisher
331+
@Nullable final SegmentReplicationCheckpointPublisher checkpointPublisher,
332+
@Nullable final RemoteStoreRefreshListener remoteStoreRefreshListener
330333
) throws IOException {
331334
super(shardRouting.shardId(), indexSettings);
332335
assert shardRouting.initializing();
@@ -410,6 +413,7 @@ public boolean shouldCache(Query query) {
410413
this.useRetentionLeasesInPeerRecovery = replicationTracker.hasAllPeerRecoveryRetentionLeases();
411414
this.refreshPendingLocationListener = new RefreshPendingLocationListener();
412415
this.checkpointPublisher = checkpointPublisher;
416+
this.remoteStoreRefreshListener = remoteStoreRefreshListener;
413417
}
414418

415419
public ThreadPool getThreadPool() {
@@ -3222,7 +3226,9 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) {
32223226

32233227
final List<ReferenceManager.RefreshListener> internalRefreshListener = new ArrayList<>();
32243228
internalRefreshListener.add(new RefreshMetricUpdater(refreshMetric));
3225-
3229+
if (remoteStoreRefreshListener != null && shardRouting.primary()) {
3230+
internalRefreshListener.add(remoteStoreRefreshListener);
3231+
}
32263232
if (this.checkpointPublisher != null && indexSettings.isSegRepEnabled() && shardRouting.primary()) {
32273233
internalRefreshListener.add(new CheckpointRefreshListener(this, this.checkpointPublisher));
32283234
}
Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.index.shard;
10+
11+
import org.apache.logging.log4j.LogManager;
12+
import org.apache.logging.log4j.Logger;
13+
import org.apache.logging.log4j.message.ParameterizedMessage;
14+
import org.apache.lucene.search.ReferenceManager;
15+
import org.apache.lucene.store.Directory;
16+
import org.apache.lucene.store.IOContext;
17+
18+
import java.io.IOException;
19+
import java.nio.file.NoSuchFileException;
20+
import java.util.Arrays;
21+
import java.util.HashSet;
22+
import java.util.Set;
23+
24+
/**
25+
* RefreshListener implementation to upload newly created segment files to the remote store
26+
*/
27+
public class RemoteStoreRefreshListener implements ReferenceManager.RefreshListener {
28+
29+
private final Directory storeDirectory;
30+
private final Directory remoteDirectory;
31+
// ToDo: This can be a map with metadata of the uploaded file as value of the map (GitHub #3398)
32+
private final Set<String> filesUploadedToRemoteStore;
33+
private static final Logger logger = LogManager.getLogger(RemoteStoreRefreshListener.class);
34+
35+
public RemoteStoreRefreshListener(Directory storeDirectory, Directory remoteDirectory) throws IOException {
36+
this.storeDirectory = storeDirectory;
37+
this.remoteDirectory = remoteDirectory;
38+
// ToDo: Handle failures in reading list of files (GitHub #3397)
39+
this.filesUploadedToRemoteStore = new HashSet<>(Arrays.asList(remoteDirectory.listAll()));
40+
}
41+
42+
@Override
43+
public void beforeRefresh() throws IOException {
44+
// Do Nothing
45+
}
46+
47+
/**
48+
* Upload new segment files created as part of the last refresh to the remote segment store.
49+
* The method also deletes segment files from remote store which are not part of local filesystem.
50+
* @param didRefresh true if the refresh opened a new reference
51+
* @throws IOException in case of I/O error in reading list of local files
52+
*/
53+
@Override
54+
public void afterRefresh(boolean didRefresh) throws IOException {
55+
if (didRefresh) {
56+
Set<String> localFiles = Set.of(storeDirectory.listAll());
57+
localFiles.stream().filter(file -> !filesUploadedToRemoteStore.contains(file)).forEach(file -> {
58+
try {
59+
remoteDirectory.copyFrom(storeDirectory, file, file, IOContext.DEFAULT);
60+
filesUploadedToRemoteStore.add(file);
61+
} catch (NoSuchFileException e) {
62+
logger.info(
63+
() -> new ParameterizedMessage("The file {} does not exist anymore. It can happen in case of temp files", file),
64+
e
65+
);
66+
} catch (IOException e) {
67+
// ToDO: Handle transient and permanent un-availability of the remote store (GitHub #3397)
68+
logger.warn(() -> new ParameterizedMessage("Exception while uploading file {} to the remote segment store", file), e);
69+
}
70+
});
71+
72+
Set<String> remoteFilesToBeDeleted = new HashSet<>();
73+
// ToDo: Instead of deleting files in sync, mark them and delete in async/periodic flow (GitHub #3142)
74+
filesUploadedToRemoteStore.stream().filter(file -> !localFiles.contains(file)).forEach(file -> {
75+
try {
76+
remoteDirectory.deleteFile(file);
77+
remoteFilesToBeDeleted.add(file);
78+
} catch (IOException e) {
79+
// ToDO: Handle transient and permanent un-availability of the remote store (GitHub #3397)
80+
logger.warn(() -> new ParameterizedMessage("Exception while deleting file {} from the remote segment store", file), e);
81+
}
82+
});
83+
84+
remoteFilesToBeDeleted.forEach(filesUploadedToRemoteStore::remove);
85+
}
86+
}
87+
}

0 commit comments

Comments
 (0)