Skip to content

Commit 1476131

Browse files
author
sukriti sinha
committed
Created an API to fetch remote store segment data
Signed-off-by: sukriti sinha <[email protected]>
1 parent 58c281f commit 1476131

17 files changed

+1354
-0
lines changed
Lines changed: 140 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,140 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.remotestore;
10+
11+
import org.opensearch.action.admin.cluster.remotestore.metadata.RemoteStoreMetadata;
12+
import org.opensearch.action.admin.cluster.remotestore.metadata.RemoteStoreMetadataResponse;
13+
import org.opensearch.cluster.ClusterState;
14+
import org.opensearch.cluster.node.DiscoveryNode;
15+
import org.opensearch.plugins.Plugin;
16+
import org.opensearch.test.OpenSearchIntegTestCase;
17+
import org.opensearch.test.transport.MockTransportService;
18+
19+
import java.util.Collection;
20+
import java.util.List;
21+
import java.util.Map;
22+
import java.util.stream.Collectors;
23+
import java.util.stream.Stream;
24+
25+
import static org.hamcrest.Matchers.allOf;
26+
import static org.hamcrest.Matchers.hasKey;
27+
28+
29+
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
30+
public class RemoteStoreMetadataIT extends RemoteStoreBaseIntegTestCase {
31+
32+
private static final String INDEX_NAME = "remote-store-meta-api-test";
33+
34+
@Override
35+
protected Collection<Class<? extends Plugin>> nodePlugins() {
36+
return Stream.concat(super.nodePlugins().stream(), Stream.of(MockTransportService.TestPlugin.class)).collect(Collectors.toList());
37+
}
38+
39+
public void setup() {
40+
internalCluster().startNodes(3);
41+
}
42+
43+
@SuppressWarnings("unchecked")
44+
public void testMetadataResponseFromAllNodes() {
45+
setup();
46+
47+
createIndex(INDEX_NAME, remoteStoreIndexSettings(0, 3));
48+
ensureGreen(INDEX_NAME);
49+
indexDocs();
50+
client().admin().indices().prepareRefresh(INDEX_NAME).get();
51+
52+
ClusterState state = getClusterState();
53+
List<String> nodes = state.nodes().getNodes().values().stream().map(DiscoveryNode::getName).collect(Collectors.toList());
54+
55+
for (String node : nodes) {
56+
RemoteStoreMetadataResponse response = client(node).admin().cluster().prepareRemoteStoreMetadata(INDEX_NAME, null).get();
57+
assertTrue(response.getSuccessfulShards() > 0);
58+
assertNotNull(response.groupByIndexAndShards());
59+
60+
response.groupByIndexAndShards().forEach((index, shardMap) -> {
61+
shardMap.forEach((shardId, metadataList) -> {
62+
assertFalse(metadataList.isEmpty());
63+
64+
for (RemoteStoreMetadata metadata : metadataList) {
65+
assertEquals(index, metadata.getIndexName());
66+
assertEquals((int) shardId, metadata.getShardId());
67+
68+
Map<String, Object> segments = metadata.getSegments();
69+
assertNotNull(segments);
70+
assertTrue(segments.containsKey("files"));
71+
Map<String, Object> files = (Map<String, Object>) segments.get("files");
72+
assertFalse(files.isEmpty());
73+
74+
for (Map.Entry<String, Object> entry : files.entrySet()) {
75+
Map<String, Object> fileMeta = (Map<String, Object>) entry.getValue();
76+
assertThat(fileMeta, allOf(hasKey("original_name"), hasKey("checksum"), hasKey("length")));
77+
}
78+
79+
assertTrue(segments.containsKey("replication_checkpoint"));
80+
Map<String, Object> checkpoint = (Map<String, Object>) segments.get("replication_checkpoint");
81+
assertThat(checkpoint, allOf(hasKey("primary_term"), hasKey("segments_gen"), hasKey("segment_infos_version"), hasKey("codec"), hasKey("created_timestamp")));
82+
83+
Map<String, Object> translog = metadata.getTranslog();
84+
assertNotNull(translog);
85+
assertThat(translog, allOf(hasKey("primary_term"), hasKey("generation"), hasKey("min_translog_gen"), hasKey("generation_to_primary_term")));
86+
}
87+
});
88+
});
89+
}
90+
}
91+
92+
@SuppressWarnings("unchecked")
93+
public void testMetadataResponseAllShards() {
94+
setup();
95+
96+
createIndex(INDEX_NAME, remoteStoreIndexSettings(0, 2));
97+
ensureGreen(INDEX_NAME);
98+
indexDocs();
99+
client().admin().indices().prepareRefresh(INDEX_NAME).get();
100+
101+
RemoteStoreMetadataResponse response = client().admin().cluster().prepareRemoteStoreMetadata(INDEX_NAME, null).get();
102+
assertEquals(2, response.getSuccessfulShards());
103+
104+
response.groupByIndexAndShards().forEach((index, shardMap) -> {
105+
shardMap.forEach((shardId, metadataList) -> {
106+
assertFalse(metadataList.isEmpty());
107+
108+
for (RemoteStoreMetadata metadata : metadataList) {
109+
assertEquals(index, metadata.getIndexName());
110+
assertEquals((int) shardId, metadata.getShardId());
111+
112+
Map<String, Object> segments = metadata.getSegments();
113+
assertNotNull(segments);
114+
assertTrue(segments.containsKey("files"));
115+
Map<String, Object> files = (Map<String, Object>) segments.get("files");
116+
assertFalse(files.isEmpty());
117+
118+
for (Map.Entry<String, Object> entry : files.entrySet()) {
119+
Map<String, Object> fileMeta = (Map<String, Object>) entry.getValue();
120+
assertThat(fileMeta, allOf(hasKey("original_name"), hasKey("checksum"), hasKey("length")));
121+
}
122+
123+
assertTrue(segments.containsKey("replication_checkpoint"));
124+
Map<String, Object> checkpoint = (Map<String, Object>) segments.get("replication_checkpoint");
125+
assertThat(checkpoint, allOf(hasKey("primary_term"), hasKey("segments_gen"), hasKey("segment_infos_version"), hasKey("codec"), hasKey("created_timestamp")));
126+
127+
Map<String, Object> translog = metadata.getTranslog();
128+
assertNotNull(translog);
129+
assertThat(translog, allOf(hasKey("primary_term"), hasKey("generation"), hasKey("min_translog_gen"), hasKey("generation_to_primary_term")));
130+
}
131+
});
132+
});
133+
}
134+
135+
private void indexDocs() {
136+
for (int i = 0; i < randomIntBetween(10, 20); i++) {
137+
client().prepareIndex(INDEX_NAME).setId("doc-" + i).setSource("field", "value-" + i).get();
138+
}
139+
}
140+
}

server/src/main/java/org/opensearch/action/ActionModule.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,8 @@
6767
import org.opensearch.action.admin.cluster.node.usage.TransportNodesUsageAction;
6868
import org.opensearch.action.admin.cluster.remote.RemoteInfoAction;
6969
import org.opensearch.action.admin.cluster.remote.TransportRemoteInfoAction;
70+
import org.opensearch.action.admin.cluster.remotestore.metadata.RemoteStoreMetadataAction;
71+
import org.opensearch.action.admin.cluster.remotestore.metadata.TransportRemoteStoreMetadataAction;
7072
import org.opensearch.action.admin.cluster.remotestore.restore.RestoreRemoteStoreAction;
7173
import org.opensearch.action.admin.cluster.remotestore.restore.TransportRestoreRemoteStoreAction;
7274
import org.opensearch.action.admin.cluster.remotestore.stats.RemoteStoreStatsAction;
@@ -378,6 +380,7 @@
378380
import org.opensearch.rest.action.admin.cluster.RestPutStoredScriptAction;
379381
import org.opensearch.rest.action.admin.cluster.RestReloadSecureSettingsAction;
380382
import org.opensearch.rest.action.admin.cluster.RestRemoteClusterInfoAction;
383+
import org.opensearch.rest.action.admin.cluster.RestRemoteStoreMetadataAction;
381384
import org.opensearch.rest.action.admin.cluster.RestRemoteStoreStatsAction;
382385
import org.opensearch.rest.action.admin.cluster.RestRestoreRemoteStoreAction;
383386
import org.opensearch.rest.action.admin.cluster.RestRestoreSnapshotAction;
@@ -638,6 +641,7 @@ public <Request extends ActionRequest, Response extends ActionResponse> void reg
638641
actions.register(NodesStatsAction.INSTANCE, TransportNodesStatsAction.class);
639642
actions.register(WlmStatsAction.INSTANCE, TransportWlmStatsAction.class);
640643
actions.register(RemoteStoreStatsAction.INSTANCE, TransportRemoteStoreStatsAction.class);
644+
actions.register(RemoteStoreMetadataAction.INSTANCE, TransportRemoteStoreMetadataAction.class);
641645
actions.register(NodesUsageAction.INSTANCE, TransportNodesUsageAction.class);
642646
actions.register(NodesHotThreadsAction.INSTANCE, TransportNodesHotThreadsAction.class);
643647
actions.register(ListTasksAction.INSTANCE, TransportListTasksAction.class);
@@ -1053,6 +1057,7 @@ public void initRestHandlers(Supplier<DiscoveryNodes> nodesInCluster) {
10531057
registerHandler.accept(new RestGetDecommissionStateAction());
10541058
registerHandler.accept(new RestRemoteStoreStatsAction());
10551059
registerHandler.accept(new RestRestoreRemoteStoreAction());
1060+
registerHandler.accept(new RestRemoteStoreMetadataAction());
10561061

10571062
// pull-based ingestion API
10581063
registerHandler.accept(new RestPauseIngestionAction());
Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.action.admin.cluster.remotestore.metadata;
10+
11+
import org.opensearch.common.annotation.ExperimentalApi;
12+
import org.opensearch.core.common.io.stream.StreamInput;
13+
import org.opensearch.core.common.io.stream.StreamOutput;
14+
import org.opensearch.core.common.io.stream.Writeable;
15+
import org.opensearch.core.xcontent.ToXContentFragment;
16+
import org.opensearch.core.xcontent.XContentBuilder;
17+
18+
import java.io.IOException;
19+
import java.util.Map;
20+
21+
/**
22+
* Response model that holds the remote store metadata (segment and translog) for a shard.
23+
*
24+
* @opensearch.internal
25+
*/
26+
@ExperimentalApi
27+
public class RemoteStoreMetadata implements Writeable, ToXContentFragment {
28+
private final Map<String, Object> segments;
29+
private final Map<String, Object> translog;
30+
private final String indexName;
31+
private final int shardId;
32+
33+
public RemoteStoreMetadata(Map<String, Object> segments, Map<String, Object> translog, String indexName, int shardId) {
34+
this.segments = segments;
35+
this.translog = translog;
36+
this.indexName = indexName;
37+
this.shardId = shardId;
38+
}
39+
40+
public RemoteStoreMetadata(StreamInput in) throws IOException {
41+
this.segments = in.readMap();
42+
this.translog = in.readMap();
43+
this.indexName = in.readString();
44+
this.shardId = in.readInt();
45+
}
46+
47+
@Override
48+
public void writeTo(StreamOutput out) throws IOException {
49+
out.writeMap(segments);
50+
out.writeMap(translog);
51+
out.writeString(indexName);
52+
out.writeInt(shardId);
53+
}
54+
55+
@Override
56+
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
57+
builder.startObject();
58+
59+
builder.field("index", indexName);
60+
builder.field("shard", shardId);
61+
62+
builder.startObject("segments");
63+
for (Map.Entry<String, Object> entry : segments.entrySet()) {
64+
builder.field(entry.getKey(), entry.getValue());
65+
}
66+
builder.endObject();
67+
68+
builder.startObject("translog");
69+
for (Map.Entry<String, Object> entry : translog.entrySet()) {
70+
builder.field(entry.getKey(), entry.getValue());
71+
}
72+
builder.endObject();
73+
74+
return builder.endObject();
75+
}
76+
77+
public String getIndexName() {
78+
return indexName;
79+
}
80+
81+
public int getShardId() {
82+
return shardId;
83+
}
84+
85+
public Map<String, Object> getSegments() {
86+
return segments;
87+
}
88+
89+
public Map<String, Object> getTranslog() {
90+
return translog;
91+
}
92+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.action.admin.cluster.remotestore.metadata;
10+
11+
import org.opensearch.action.ActionType;
12+
13+
/**
14+
* Action to fetch metadata from remote store
15+
*
16+
* @opensearch.internal
17+
*/
18+
public class RemoteStoreMetadataAction extends ActionType<RemoteStoreMetadataResponse> {
19+
public static final RemoteStoreMetadataAction INSTANCE = new RemoteStoreMetadataAction();
20+
public static final String NAME = "cluster:admin/remote_store/metadata";
21+
22+
private RemoteStoreMetadataAction() {
23+
super(NAME, RemoteStoreMetadataResponse::new);
24+
}
25+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.action.admin.cluster.remotestore.metadata;
10+
11+
import org.opensearch.action.support.broadcast.BroadcastRequest;
12+
import org.opensearch.common.annotation.ExperimentalApi;
13+
import org.opensearch.core.common.io.stream.StreamInput;
14+
import org.opensearch.core.common.io.stream.StreamOutput;
15+
16+
import java.io.IOException;
17+
18+
/**
19+
* Request object for fetching remote store metadata of shards across one or more indices.
20+
*
21+
* @opensearch.internal
22+
*/
23+
@ExperimentalApi
24+
public class RemoteStoreMetadataRequest extends BroadcastRequest<RemoteStoreMetadataRequest> {
25+
private String[] shards;
26+
27+
public RemoteStoreMetadataRequest() {
28+
super((String[]) null);
29+
shards = new String[0];
30+
}
31+
32+
public RemoteStoreMetadataRequest(StreamInput in) throws IOException {
33+
super(in);
34+
shards = in.readStringArray();
35+
}
36+
37+
@Override
38+
public void writeTo(StreamOutput out) throws IOException {
39+
super.writeTo(out);
40+
out.writeStringArray(shards);
41+
}
42+
43+
public RemoteStoreMetadataRequest shards(String... shards) {
44+
this.shards = shards;
45+
return this;
46+
}
47+
48+
public String[] shards() {
49+
return this.shards;
50+
}
51+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.action.admin.cluster.remotestore.metadata;
10+
11+
import org.opensearch.action.support.broadcast.BroadcastOperationRequestBuilder;
12+
import org.opensearch.common.annotation.ExperimentalApi;
13+
import org.opensearch.common.unit.TimeValue;
14+
import org.opensearch.transport.client.OpenSearchClient;
15+
16+
/**
17+
* Builder for RemoteStoreMetadataRequest
18+
*
19+
* @opensearch.api
20+
*/
21+
@ExperimentalApi
22+
public class RemoteStoreMetadataRequestBuilder extends BroadcastOperationRequestBuilder<
23+
RemoteStoreMetadataRequest,
24+
RemoteStoreMetadataResponse,
25+
RemoteStoreMetadataRequestBuilder> {
26+
27+
public RemoteStoreMetadataRequestBuilder(OpenSearchClient client, RemoteStoreMetadataAction action) {
28+
super(client, action, new RemoteStoreMetadataRequest());
29+
}
30+
31+
/**
32+
* Sets timeout of request.
33+
*/
34+
public final RemoteStoreMetadataRequestBuilder setTimeout(TimeValue timeout) {
35+
request.timeout(timeout);
36+
return this;
37+
}
38+
39+
/**
40+
* Sets shards preference of request.
41+
*/
42+
public final RemoteStoreMetadataRequestBuilder setShards(String... shards) {
43+
request.shards(shards);
44+
return this;
45+
}
46+
}

0 commit comments

Comments
 (0)