1
+ /*
2
+ * SPDX-License-Identifier: Apache-2.0
3
+ *
4
+ * The OpenSearch Contributors require contributions made to
5
+ * this file be licensed under the Apache-2.0 license or a
6
+ * compatible open source license.
7
+ */
8
+
9
+ package org .opensearch .remotestore ;
10
+
11
+ import org .apache .logging .log4j .LogManager ;
12
+ import org .apache .logging .log4j .Logger ;
13
+ import org .opensearch .action .admin .cluster .remotestore .metadata .RemoteStoreMetadata ;
14
+ import org .opensearch .action .admin .cluster .remotestore .metadata .RemoteStoreMetadataResponse ;
15
+ import org .opensearch .action .index .IndexResponse ;
16
+ import org .opensearch .action .search .SearchResponse ;
17
+ import org .opensearch .cluster .ClusterModule ;
18
+ import org .opensearch .cluster .ClusterState ;
19
+ import org .opensearch .cluster .node .DiscoveryNode ;
20
+ import org .opensearch .common .settings .Settings ;
21
+ import org .opensearch .common .unit .TimeValue ;
22
+ import org .opensearch .core .common .bytes .BytesReference ;
23
+ import org .opensearch .common .xcontent .LoggingDeprecationHandler ;
24
+ import org .opensearch .core .xcontent .NamedXContentRegistry ;
25
+ import org .opensearch .core .xcontent .ToXContent ;
26
+ import org .opensearch .core .xcontent .XContentBuilder ;
27
+ import org .opensearch .common .xcontent .XContentFactory ;
28
+ import org .opensearch .core .xcontent .XContentParser ;
29
+ import org .opensearch .common .xcontent .XContentType ;
30
+ import org .opensearch .index .query .QueryBuilders ;
31
+ import org .opensearch .plugins .Plugin ;
32
+ import org .opensearch .core .rest .RestStatus ;
33
+ import org .opensearch .test .OpenSearchIntegTestCase ;
34
+ import org .opensearch .test .transport .MockTransportService ;
35
+ import org .junit .After ;
36
+
37
+ import java .util .ArrayList ;
38
+ import java .util .Collection ;
39
+ import java .util .HashMap ;
40
+ import java .util .List ;
41
+ import java .util .Map ;
42
+ import java .util .concurrent .TimeUnit ;
43
+ import java .util .stream .Collectors ;
44
+ import java .util .stream .Stream ;
45
+
46
+ import static org .opensearch .cluster .metadata .IndexMetadata .SETTING_NUMBER_OF_REPLICAS ;
47
+ import static org .opensearch .test .hamcrest .OpenSearchAssertions .assertAcked ;
48
+
49
+ @ OpenSearchIntegTestCase .ClusterScope (scope = OpenSearchIntegTestCase .Scope .TEST , numDataNodes = 0 )
50
+ public class RemoteStoreMetadataIT extends RemoteStoreBaseIntegTestCase {
51
+
52
+ private static final Logger logger = LogManager .getLogger (RemoteStoreMetadataIT .class );
53
+ private static final String INDEX_NAME = "remote-store-test-idx-1" ;
54
+
55
+ @ Override
56
+ protected Collection <Class <? extends Plugin >> nodePlugins () {
57
+ return Stream .concat (super .nodePlugins ().stream (), Stream .of (MockTransportService .TestPlugin .class ))
58
+ .collect (Collectors .toList ());
59
+ }
60
+
61
+ @ Override
62
+ protected NamedXContentRegistry xContentRegistry () {
63
+ return new NamedXContentRegistry (ClusterModule .getNamedXWriteables ());
64
+ }
65
+
66
+ @ After
67
+ public void cleanup () throws Exception {
68
+ // Clean up indices after each test
69
+ assertAcked (client ().admin ().indices ().prepareDelete ("*" ).get ());
70
+ assertBusy (() -> {
71
+ assertTrue (client ().admin ().indices ().prepareGetIndex ().get ().getIndices ().length == 0 );
72
+ }, 30 , TimeUnit .SECONDS );
73
+ }
74
+
75
+ public void setup () {
76
+ internalCluster ().startNodes (3 );
77
+ }
78
+
79
+ public void testMetadataResponseFromAllNodes () throws Exception {
80
+ setup ();
81
+
82
+ // Step 1 - We create the cluster, create an index with remote store settings, and index some documents.
83
+ // This sets up the environment and ensures that segment and translog metadata are generated and uploaded.
84
+ createIndex (INDEX_NAME , remoteStoreIndexSettings (0 ));
85
+ ensureYellowAndNoInitializingShards (INDEX_NAME );
86
+ ensureGreen (INDEX_NAME );
87
+ indexDocs ();
88
+
89
+ ClusterState state = getClusterState ();
90
+ List <String > nodes = state .nodes ().getNodes ().values ().stream ()
91
+ .map (DiscoveryNode ::getName )
92
+ .collect (Collectors .toList ());
93
+
94
+ // Step 2 - We collect all node names in the cluster and send RemoteStoreMetadata API requests from each node
95
+ // to verify that segment and translog metadata can be fetched successfully from all data nodes.
96
+ String shardId = "0" ;
97
+ for (String node : nodes ) {
98
+ assertBusy (() -> {
99
+ RemoteStoreMetadataResponse response = client (node ).admin ()
100
+ .cluster ()
101
+ .prepareRemoteStoreMetadata (INDEX_NAME , shardId )
102
+ .setTimeout (TimeValue .timeValueSeconds (30 ))
103
+ .get ();
104
+
105
+ assertTrue (response .getSuccessfulShards () > 0 );
106
+ Map <String , Map <Integer , List <RemoteStoreMetadata >>> indexMetadata = response .groupByIndexAndShards ();
107
+ assertNotNull ("Metadata should not be null" , indexMetadata );
108
+ assertTrue ("Index metadata should exist" , indexMetadata .containsKey (INDEX_NAME ));
109
+ assertTrue ("Shard metadata should exist" , indexMetadata .get (INDEX_NAME ).containsKey (0 ));
110
+
111
+ List <RemoteStoreMetadata > shardMetadata = indexMetadata .get (INDEX_NAME ).get (0 );
112
+ assertFalse ("Shard metadata should not be empty" , shardMetadata .isEmpty ());
113
+ validateSegmentMetadata (shardMetadata .get (0 ));
114
+ validateTranslogMetadata (shardMetadata .get (0 ));
115
+ }, 30 , TimeUnit .SECONDS );
116
+ }
117
+
118
+ changeReplicaCountAndEnsureGreen (1 );
119
+ verifyDocumentCount ();
120
+
121
+ for (String node : nodes ) {
122
+ assertBusy (() -> {
123
+ RemoteStoreMetadataResponse response = client (node ).admin ()
124
+ .cluster ()
125
+ .prepareRemoteStoreMetadata (INDEX_NAME , shardId )
126
+ .get ();
127
+
128
+ Map <String , Map <Integer , List <RemoteStoreMetadata >>> indexMetadata = response .groupByIndexAndShards ();
129
+ List <RemoteStoreMetadata > shardMetadata = indexMetadata .get (INDEX_NAME ).get (0 );
130
+
131
+ assertFalse ("Shard metadata should not be empty" , shardMetadata .isEmpty ());
132
+ validateSegmentMetadata (shardMetadata .get (0 ));
133
+ validateTranslogMetadata (shardMetadata .get (0 ));
134
+ }, 30 , TimeUnit .SECONDS );
135
+ }
136
+ }
137
+
138
+
139
+ public void testMetadataResponseAllShards () throws Exception {
140
+ setup ();
141
+
142
+ // Step 1 - We created multi-shard index and index some documents
143
+ createIndex (INDEX_NAME , remoteStoreIndexSettings (0 , 3 ));
144
+ ensureYellowAndNoInitializingShards (INDEX_NAME );
145
+ ensureGreen (INDEX_NAME );
146
+ indexDocs ();
147
+
148
+ // Step 3 - We invoke the RemoteStoreMetadata API without specifying a shard,
149
+ // which returns metadata for all shards of the given index. We then validate the response for each shard.
150
+ assertBusy (() -> {
151
+ RemoteStoreMetadataResponse response = client ().admin ()
152
+ .cluster ()
153
+ .prepareRemoteStoreMetadata (INDEX_NAME , null )
154
+ .get ();
155
+
156
+ assertEquals (3 , response .getSuccessfulShards ());
157
+ Map <String , Map <Integer , List <RemoteStoreMetadata >>> indexMetadata = response .groupByIndexAndShards ();
158
+ assertNotNull ("Metadata should not be null" , indexMetadata );
159
+ assertEquals (1 , indexMetadata .size ());
160
+ assertEquals (3 , indexMetadata .get (INDEX_NAME ).size ());
161
+
162
+ for (int shardId = 0 ; shardId < 3 ; shardId ++) {
163
+ List <RemoteStoreMetadata > shardMetadata = indexMetadata .get (INDEX_NAME ).get (shardId );
164
+ assertNotNull ("Metadata for shard " + shardId + " should not be null" , shardMetadata );
165
+ assertFalse ("Metadata for shard " + shardId + " should not be empty" , shardMetadata .isEmpty ());
166
+ validateSegmentMetadata (shardMetadata .get (0 ));
167
+ validateTranslogMetadata (shardMetadata .get (0 ));
168
+ }
169
+ }, 30 , TimeUnit .SECONDS );
170
+ }
171
+
172
+ private void indexDocs () {
173
+ try {
174
+ // Created sample documents with actual content
175
+ List <Map <String , Object >> documents = new ArrayList <>();
176
+
177
+ int numDocs = randomIntBetween (5 , 10 );
178
+ for (int i = 0 ; i < numDocs ; i ++) {
179
+ Map <String , Object > doc = new HashMap <>();
180
+ doc .put ("title" , "Test Document " + (i + 1 ));
181
+ doc .put ("content" , "This is test document number " + (i + 1 ) + " with some content." );
182
+ doc .put ("timestamp" , System .currentTimeMillis ());
183
+ doc .put ("doc_id" , i );
184
+ documents .add (doc );
185
+
186
+ IndexResponse response = client ().prepareIndex (INDEX_NAME )
187
+ .setSource (doc , XContentType .JSON )
188
+ .get ();
189
+
190
+ assertEquals (RestStatus .CREATED , response .status ());
191
+
192
+ if (randomBoolean ()) {
193
+ flush (INDEX_NAME );
194
+ } else {
195
+ refresh (INDEX_NAME );
196
+ }
197
+ }
198
+
199
+ // Ensure documents are indexed
200
+ refresh (INDEX_NAME );
201
+ verifyDocumentCount ();
202
+ } catch (Exception e ) {
203
+ logger .error ("Failed to index documents" , e );
204
+ fail ("Failed to index documents: " + e .getMessage ());
205
+ }
206
+ }
207
+
208
+ private void verifyDocumentCount () throws Exception {
209
+ assertBusy (() -> {
210
+ SearchResponse searchResponse = client ().prepareSearch (INDEX_NAME )
211
+ .setQuery (QueryBuilders .matchAllQuery ())
212
+ .get ();
213
+
214
+ assertTrue ("Documents should be indexed" , searchResponse .getHits ().getTotalHits ().value () > 0 );
215
+ }, 30 , TimeUnit .SECONDS );
216
+ }
217
+
218
+ @ SuppressWarnings ("unchecked" )
219
+ private void validateSegmentMetadata (RemoteStoreMetadata metadata ) {
220
+ Map <String , Object > metadataMap = toMap (metadata );
221
+ Map <String , Object > segments = (Map <String , Object >) metadataMap .get ("segments" );
222
+
223
+ assertNotNull ("Segments metadata should not be null" , segments );
224
+
225
+ if (!segments .isEmpty ()) {
226
+ segments .values ().forEach (value -> {
227
+ if (value instanceof Map ) {
228
+ Map <String , Object > segmentInfo = (Map <String , Object >) value ;
229
+ assertNotNull ("Generation should not be null" , segmentInfo .get ("generation" ));
230
+ assertNotNull ("Primary term should not be null" , segmentInfo .get ("primary_term" ));
231
+
232
+ Object uploadedSegmentsObj = segmentInfo .get ("uploaded_segments" );
233
+ if (uploadedSegmentsObj instanceof Map ) {
234
+ Map <String , Object > uploadedSegments = (Map <String , Object >) uploadedSegmentsObj ;
235
+ if (!uploadedSegments .isEmpty ()) {
236
+ uploadedSegments .values ().forEach (segmentFile -> {
237
+ if (segmentFile instanceof Map ) {
238
+ Map <String , Object > fileInfo = (Map <String , Object >) segmentFile ;
239
+ assertNotNull ("Original name should not be null" , fileInfo .get ("original_name" ));
240
+ assertNotNull ("Checksum should not be null" , fileInfo .get ("checksum" ));
241
+ assertNotNull ("Length should not be null" , fileInfo .get ("length" ));
242
+ }
243
+ });
244
+ }
245
+ }
246
+ }
247
+ });
248
+ }
249
+ }
250
+
251
+ @ SuppressWarnings ("unchecked" )
252
+ private void validateTranslogMetadata (RemoteStoreMetadata metadata ) {
253
+ Map <String , Object > metadataMap = toMap (metadata );
254
+ Map <String , Object > translog = (Map <String , Object >) metadataMap .get ("translog" );
255
+
256
+ assertNotNull ("Translog metadata should not be null" , translog );
257
+
258
+ if (!translog .isEmpty ()) {
259
+ translog .values ().forEach (value -> {
260
+ if (value instanceof Map ) {
261
+ Map <String , Object > translogInfo = (Map <String , Object >) value ;
262
+ assertNotNull ("Generation should not be null" , translogInfo .get ("generation" ));
263
+ assertNotNull ("Primary term should not be null" , translogInfo .get ("primary_term" ));
264
+ assertNotNull ("Min translog generation should not be null" , translogInfo .get ("min_translog_gen" ));
265
+
266
+ Object contentObj = translogInfo .get ("content" );
267
+ if (contentObj instanceof Map ) {
268
+ Map <String , Object > content = (Map <String , Object >) contentObj ;
269
+ assertNotNull ("Content should not be null" , content );
270
+ assertNotNull ("Content generation should not be null" , content .get ("generation" ));
271
+ assertNotNull ("Content primary term should not be null" , content .get ("primary_term" ));
272
+ assertNotNull ("Content min translog generation should not be null" ,
273
+ content .get ("min_translog_generation" ));
274
+ }
275
+ }
276
+ });
277
+ }
278
+ }
279
+
280
+ private Map <String , Object > toMap (RemoteStoreMetadata metadata ) {
281
+ try {
282
+ XContentBuilder builder = XContentFactory .jsonBuilder ();
283
+ metadata .toXContent (builder , ToXContent .EMPTY_PARAMS );
284
+ BytesReference bytes = BytesReference .bytes (builder );
285
+
286
+ try (XContentParser parser = XContentType .JSON .xContent ()
287
+ .createParser (xContentRegistry (), LoggingDeprecationHandler .INSTANCE , bytes .streamInput ())) {
288
+ return parser .map ();
289
+ }
290
+ } catch (Exception e ) {
291
+ throw new RuntimeException ("Failed to convert metadata to map" , e );
292
+ }
293
+ }
294
+
295
+ private void changeReplicaCountAndEnsureGreen (int replicaCount ) {
296
+ assertAcked (
297
+ client ().admin ()
298
+ .indices ()
299
+ .prepareUpdateSettings (INDEX_NAME )
300
+ .setSettings (Settings .builder ().put (SETTING_NUMBER_OF_REPLICAS , replicaCount ))
301
+ );
302
+ ensureYellowAndNoInitializingShards (INDEX_NAME );
303
+ ensureGreen (INDEX_NAME );
304
+ }
305
+ }
0 commit comments