Skip to content

Commit f2a3889

Browse files
sachinpkaleSachin Kale
andcommitted
Add RemoteDirectoryFactory and use RemoteDirectory instance in RefreshListener (#3285)
Co-authored-by: Sachin Kale <[email protected]> Signed-off-by: Sachin Kale <[email protected]>
1 parent 07f6f6c commit f2a3889

File tree

11 files changed

+211
-11
lines changed

11 files changed

+211
-11
lines changed

server/src/internalClusterTest/java/org/opensearch/index/shard/IndexShardIT.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -675,7 +675,8 @@ public static final IndexShard newIndexShard(
675675
() -> {},
676676
RetentionLeaseSyncer.EMPTY,
677677
cbs,
678-
SegmentReplicationCheckpointPublisher.EMPTY
678+
SegmentReplicationCheckpointPublisher.EMPTY,
679+
null
679680
);
680681
}
681682

server/src/main/java/org/opensearch/index/IndexModule.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@
7070
import org.opensearch.index.shard.SearchOperationListener;
7171
import org.opensearch.index.similarity.SimilarityService;
7272
import org.opensearch.index.store.FsDirectoryFactory;
73+
import org.opensearch.index.store.RemoteDirectoryFactory;
7374
import org.opensearch.indices.IndicesQueryCache;
7475
import org.opensearch.indices.breaker.CircuitBreakerService;
7576
import org.opensearch.indices.fielddata.cache.IndicesFieldDataCache;
@@ -118,6 +119,8 @@ public final class IndexModule {
118119

119120
private static final FsDirectoryFactory DEFAULT_DIRECTORY_FACTORY = new FsDirectoryFactory();
120121

122+
private static final RemoteDirectoryFactory REMOTE_DIRECTORY_FACTORY = new RemoteDirectoryFactory();
123+
121124
private static final IndexStorePlugin.RecoveryStateFactory DEFAULT_RECOVERY_STATE_FACTORY = RecoveryState::new;
122125

123126
public static final Setting<String> INDEX_STORE_TYPE_SETTING = new Setting<>(
@@ -516,6 +519,7 @@ public IndexService newIndexService(
516519
client,
517520
queryCache,
518521
directoryFactory,
522+
REMOTE_DIRECTORY_FACTORY,
519523
eventListener,
520524
readerWrapperFactory,
521525
mapperRegistry,

server/src/main/java/org/opensearch/index/IndexService.java

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@
8181
import org.opensearch.index.shard.IndexShard;
8282
import org.opensearch.index.shard.IndexShardClosedException;
8383
import org.opensearch.index.shard.IndexingOperationListener;
84+
import org.opensearch.index.shard.RemoteStoreRefreshListener;
8485
import org.opensearch.index.shard.SearchOperationListener;
8586
import org.opensearch.index.shard.ShardId;
8687
import org.opensearch.index.shard.ShardNotFoundException;
@@ -96,6 +97,9 @@
9697
import org.opensearch.indices.recovery.RecoveryState;
9798
import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher;
9899
import org.opensearch.plugins.IndexStorePlugin;
100+
import org.opensearch.repositories.RepositoriesService;
101+
import org.opensearch.repositories.Repository;
102+
import org.opensearch.repositories.RepositoryMissingException;
99103
import org.opensearch.script.ScriptService;
100104
import org.opensearch.search.aggregations.support.ValuesSourceRegistry;
101105
import org.opensearch.threadpool.ThreadPool;
@@ -136,6 +140,7 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
136140
private final NodeEnvironment nodeEnv;
137141
private final ShardStoreDeleter shardStoreDeleter;
138142
private final IndexStorePlugin.DirectoryFactory directoryFactory;
143+
private final IndexStorePlugin.RemoteDirectoryFactory remoteDirectoryFactory;
139144
private final IndexStorePlugin.RecoveryStateFactory recoveryStateFactory;
140145
private final CheckedFunction<DirectoryReader, DirectoryReader, IOException> readerWrapper;
141146
private final IndexCache indexCache;
@@ -190,6 +195,7 @@ public IndexService(
190195
Client client,
191196
QueryCache queryCache,
192197
IndexStorePlugin.DirectoryFactory directoryFactory,
198+
IndexStorePlugin.RemoteDirectoryFactory remoteDirectoryFactory,
193199
IndexEventListener eventListener,
194200
Function<IndexService, CheckedFunction<DirectoryReader, DirectoryReader, IOException>> wrapperFactory,
195201
MapperRegistry mapperRegistry,
@@ -260,6 +266,7 @@ public IndexService(
260266
this.eventListener = eventListener;
261267
this.nodeEnv = nodeEnv;
262268
this.directoryFactory = directoryFactory;
269+
this.remoteDirectoryFactory = remoteDirectoryFactory;
263270
this.recoveryStateFactory = recoveryStateFactory;
264271
this.engineFactory = Objects.requireNonNull(engineFactory);
265272
this.engineConfigFactory = Objects.requireNonNull(engineConfigFactory);
@@ -430,7 +437,8 @@ public synchronized IndexShard createShard(
430437
final ShardRouting routing,
431438
final Consumer<ShardId> globalCheckpointSyncer,
432439
final RetentionLeaseSyncer retentionLeaseSyncer,
433-
final SegmentReplicationCheckpointPublisher checkpointPublisher
440+
final SegmentReplicationCheckpointPublisher checkpointPublisher,
441+
final RepositoriesService repositoriesService
434442
) throws IOException {
435443
Objects.requireNonNull(retentionLeaseSyncer);
436444
/*
@@ -504,6 +512,21 @@ public synchronized IndexShard createShard(
504512
}
505513
};
506514
Directory directory = directoryFactory.newDirectory(this.indexSettings, path);
515+
Directory remoteDirectory = null;
516+
RemoteStoreRefreshListener remoteStoreRefreshListener = null;
517+
if (this.indexSettings.isRemoteStoreEnabled()) {
518+
try {
519+
Repository repository = repositoriesService.repository("dragon-stone");
520+
remoteDirectory = remoteDirectoryFactory.newDirectory(this.indexSettings, path, repository);
521+
remoteStoreRefreshListener = new RemoteStoreRefreshListener(directory, remoteDirectory);
522+
} catch (RepositoryMissingException e) {
523+
throw new IllegalArgumentException(
524+
"Repository should be created before creating index with remote_store enabled setting",
525+
e
526+
);
527+
}
528+
}
529+
507530
store = new Store(
508531
shardId,
509532
this.indexSettings,
@@ -533,7 +556,8 @@ public synchronized IndexShard createShard(
533556
() -> globalCheckpointSyncer.accept(shardId),
534557
retentionLeaseSyncer,
535558
circuitBreakerService,
536-
this.indexSettings.isSegRepEnabled() && routing.primary() ? checkpointPublisher : null
559+
this.indexSettings.isSegRepEnabled() && routing.primary() ? checkpointPublisher : null,
560+
remoteStoreRefreshListener
537561
);
538562
eventListener.indexShardStateChanged(indexShard, null, indexShard.state(), "shard created");
539563
eventListener.afterIndexShardCreated(indexShard);

server/src/main/java/org/opensearch/index/shard/IndexShard.java

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -304,6 +304,8 @@ Runnable getGlobalCheckpointSyncer() {
304304
private volatile boolean useRetentionLeasesInPeerRecovery;
305305
private final ReferenceManager.RefreshListener checkpointRefreshListener;
306306

307+
private final RemoteStoreRefreshListener remoteStoreRefreshListener;
308+
307309
public IndexShard(
308310
final ShardRouting shardRouting,
309311
final IndexSettings indexSettings,
@@ -325,7 +327,8 @@ public IndexShard(
325327
final Runnable globalCheckpointSyncer,
326328
final RetentionLeaseSyncer retentionLeaseSyncer,
327329
final CircuitBreakerService circuitBreakerService,
328-
@Nullable final SegmentReplicationCheckpointPublisher checkpointPublisher
330+
@Nullable final SegmentReplicationCheckpointPublisher checkpointPublisher,
331+
final RemoteStoreRefreshListener remoteStoreRefreshListener
329332
) throws IOException {
330333
super(shardRouting.shardId(), indexSettings);
331334
assert shardRouting.initializing();
@@ -413,6 +416,7 @@ public boolean shouldCache(Query query) {
413416
} else {
414417
this.checkpointRefreshListener = null;
415418
}
419+
this.remoteStoreRefreshListener = remoteStoreRefreshListener;
416420
}
417421

418422
public ThreadPool getThreadPool() {
@@ -3131,11 +3135,13 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) {
31313135
}
31323136
};
31333137

3134-
final List<ReferenceManager.RefreshListener> internalRefreshListener;
3138+
final List<ReferenceManager.RefreshListener> internalRefreshListener = new ArrayList<>();
3139+
internalRefreshListener.add(new RefreshMetricUpdater(refreshMetric));
3140+
if (remoteStoreRefreshListener != null && shardRouting.primary()) {
3141+
internalRefreshListener.add(remoteStoreRefreshListener);
3142+
}
31353143
if (this.checkpointRefreshListener != null) {
3136-
internalRefreshListener = Arrays.asList(new RefreshMetricUpdater(refreshMetric), checkpointRefreshListener);
3137-
} else {
3138-
internalRefreshListener = Collections.singletonList(new RefreshMetricUpdater(refreshMetric));
3144+
internalRefreshListener.add(checkpointRefreshListener);
31393145
}
31403146

31413147
return this.engineConfigFactory.newEngineConfig(
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.index.shard;
10+
11+
import org.apache.lucene.search.ReferenceManager;
12+
import org.apache.lucene.store.Directory;
13+
14+
import java.io.IOException;
15+
16+
/**
17+
* RefreshListener implementation to upload newly created segment files to the remote store
18+
*/
19+
public class RemoteStoreRefreshListener implements ReferenceManager.RefreshListener {
20+
21+
private final Directory storeDirectory;
22+
private final Directory remoteDirectory;
23+
24+
public RemoteStoreRefreshListener(Directory storeDirectory, Directory remoteDirectory) {
25+
this.storeDirectory = storeDirectory;
26+
this.remoteDirectory = remoteDirectory;
27+
}
28+
29+
@Override
30+
public void beforeRefresh() throws IOException {
31+
// ToDo Add implementation
32+
}
33+
34+
@Override
35+
public void afterRefresh(boolean didRefresh) throws IOException {
36+
// ToDo Add implementation
37+
}
38+
}
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.index.store;
10+
11+
import org.apache.lucene.store.Directory;
12+
import org.opensearch.common.blobstore.BlobContainer;
13+
import org.opensearch.common.blobstore.BlobPath;
14+
import org.opensearch.index.IndexSettings;
15+
import org.opensearch.index.shard.ShardPath;
16+
import org.opensearch.plugins.IndexStorePlugin;
17+
import org.opensearch.repositories.Repository;
18+
import org.opensearch.repositories.blobstore.BlobStoreRepository;
19+
20+
import java.io.IOException;
21+
22+
/**
23+
* Factory for a remote store directory
24+
*
25+
* @opensearch.internal
26+
*/
27+
public class RemoteDirectoryFactory implements IndexStorePlugin.RemoteDirectoryFactory {
28+
29+
@Override
30+
public Directory newDirectory(IndexSettings indexSettings, ShardPath path, Repository repository) throws IOException {
31+
assert repository instanceof BlobStoreRepository : "repository should be instance of BlobStoreRepository";
32+
BlobPath blobPath = new BlobPath();
33+
blobPath = blobPath.add(indexSettings.getIndex().getName()).add(String.valueOf(path.getShardId().getId()));
34+
BlobContainer blobContainer = ((BlobStoreRepository) repository).blobStore().blobContainer(blobPath);
35+
return new RemoteDirectory(blobContainer);
36+
}
37+
}

server/src/main/java/org/opensearch/indices/IndicesService.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -859,7 +859,13 @@ public IndexShard createShard(
859859
IndexService indexService = indexService(shardRouting.index());
860860
assert indexService != null;
861861
RecoveryState recoveryState = indexService.createRecoveryState(shardRouting, targetNode, sourceNode);
862-
IndexShard indexShard = indexService.createShard(shardRouting, globalCheckpointSyncer, retentionLeaseSyncer, checkpointPublisher);
862+
IndexShard indexShard = indexService.createShard(
863+
shardRouting,
864+
globalCheckpointSyncer,
865+
retentionLeaseSyncer,
866+
checkpointPublisher,
867+
repositoriesService
868+
);
863869
indexShard.addShardFailureCallback(onShardFailure);
864870
indexShard.startRecovery(recoveryState, recoveryTargetService, recoveryListener, repositoriesService, mapping -> {
865871
assert recoveryState.getRecoverySource().getType() == RecoverySource.Type.LOCAL_SHARDS

server/src/main/java/org/opensearch/plugins/IndexStorePlugin.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import org.opensearch.index.IndexSettings;
4040
import org.opensearch.index.shard.ShardPath;
4141
import org.opensearch.indices.recovery.RecoveryState;
42+
import org.opensearch.repositories.Repository;
4243

4344
import java.io.IOException;
4445
import java.util.Collections;
@@ -66,6 +67,22 @@ interface DirectoryFactory {
6667
Directory newDirectory(IndexSettings indexSettings, ShardPath shardPath) throws IOException;
6768
}
6869

70+
/**
71+
* An interface that describes how to create a new remote directory instance per shard.
72+
*/
73+
@FunctionalInterface
74+
interface RemoteDirectoryFactory {
75+
/**
76+
* Creates a new remote directory per shard. This method is called once per shard on shard creation.
77+
* @param indexSettings the shards index settings
78+
* @param shardPath the path the shard is using
79+
* @param repository to get the BlobContainer details
80+
* @return a new RemoteDirectory instance
81+
* @throws IOException if an IOException occurs while opening the directory
82+
*/
83+
Directory newDirectory(IndexSettings indexSettings, ShardPath shardPath, Repository repository) throws IOException;
84+
}
85+
6986
/**
7087
* The {@link DirectoryFactory} mappings for this plugin. When an index is created the store type setting
7188
* {@link org.opensearch.index.IndexModule#INDEX_STORE_TYPE_SETTING} on the index will be examined and either use the default or a
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.index.store;
10+
11+
import org.apache.lucene.store.Directory;
12+
import org.junit.Before;
13+
import org.mockito.ArgumentCaptor;
14+
import org.opensearch.common.blobstore.BlobContainer;
15+
import org.opensearch.common.blobstore.BlobPath;
16+
import org.opensearch.common.blobstore.BlobStore;
17+
import org.opensearch.common.settings.Settings;
18+
import org.opensearch.index.IndexSettings;
19+
import org.opensearch.index.shard.ShardId;
20+
import org.opensearch.index.shard.ShardPath;
21+
import org.opensearch.repositories.blobstore.BlobStoreRepository;
22+
import org.opensearch.test.IndexSettingsModule;
23+
import org.opensearch.test.OpenSearchTestCase;
24+
25+
import java.io.IOException;
26+
import java.nio.file.Path;
27+
import java.util.Collections;
28+
29+
import static org.mockito.ArgumentMatchers.any;
30+
import static org.mockito.Mockito.mock;
31+
import static org.mockito.Mockito.when;
32+
import static org.mockito.Mockito.verify;
33+
34+
public class RemoteDirectoryFactoryTests extends OpenSearchTestCase {
35+
36+
private RemoteDirectoryFactory remoteDirectoryFactory;
37+
38+
@Before
39+
public void setup() {
40+
remoteDirectoryFactory = new RemoteDirectoryFactory();
41+
}
42+
43+
public void testNewDirectory() throws IOException {
44+
Settings settings = Settings.builder().build();
45+
IndexSettings indexSettings = IndexSettingsModule.newIndexSettings("foo", settings);
46+
Path tempDir = createTempDir().resolve(indexSettings.getUUID()).resolve("0");
47+
ShardPath shardPath = new ShardPath(false, tempDir, tempDir, new ShardId(indexSettings.getIndex(), 0));
48+
BlobStoreRepository repository = mock(BlobStoreRepository.class);
49+
BlobStore blobStore = mock(BlobStore.class);
50+
BlobContainer blobContainer = mock(BlobContainer.class);
51+
when(repository.blobStore()).thenReturn(blobStore);
52+
when(blobStore.blobContainer(any())).thenReturn(blobContainer);
53+
when(blobContainer.listBlobs()).thenReturn(Collections.emptyMap());
54+
55+
Directory directory = remoteDirectoryFactory.newDirectory(indexSettings, shardPath, repository);
56+
assertTrue(directory instanceof RemoteDirectory);
57+
ArgumentCaptor<BlobPath> blobPathCaptor = ArgumentCaptor.forClass(BlobPath.class);
58+
verify(blobStore).blobContainer(blobPathCaptor.capture());
59+
BlobPath blobPath = blobPathCaptor.getValue();
60+
assertEquals("foo/0/", blobPath.buildAsString());
61+
62+
directory.listAll();
63+
verify(blobContainer).listBlobs();
64+
}
65+
}

server/src/test/java/org/opensearch/indices/IndicesLifecycleListenerSingleNodeTests.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -153,7 +153,8 @@ public void afterIndexRemoved(Index index, IndexSettings indexSettings, IndexRem
153153
newRouting,
154154
s -> {},
155155
RetentionLeaseSyncer.EMPTY,
156-
SegmentReplicationCheckpointPublisher.EMPTY
156+
SegmentReplicationCheckpointPublisher.EMPTY,
157+
null
157158
);
158159
IndexShardTestCase.updateRoutingEntry(shard, newRouting);
159160
assertEquals(5, counter.get());

test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -525,7 +525,8 @@ protected IndexShard newShard(
525525
globalCheckpointSyncer,
526526
retentionLeaseSyncer,
527527
breakerService,
528-
checkpointPublisher
528+
checkpointPublisher,
529+
null
529530
);
530531
indexShard.addShardFailureCallback(DEFAULT_SHARD_FAILURE_HANDLER);
531532
success = true;

0 commit comments

Comments
 (0)