35
35
import java .nio .file .DirectoryStream ;
36
36
import java .nio .file .Files ;
37
37
import java .nio .file .Path ;
38
- import java .util .*;
38
+ import java .util .ArrayList ;
39
+ import java .util .Collections ;
40
+ import java .util .HashMap ;
41
+ import java .util .List ;
42
+ import java .util .Map ;
43
+ import java .util .Objects ;
44
+ import java .util .Arrays ;
39
45
import java .util .concurrent .atomic .AtomicInteger ;
40
46
import java .util .stream .Collectors ;
41
47
@@ -77,7 +83,7 @@ public TransportRemoteStoreMetadataAction(
77
83
protected void doExecute (Task task , RemoteStoreMetadataRequest request , ActionListener <RemoteStoreMetadataResponse > listener ) {
78
84
try {
79
85
ClusterState state = clusterService .state ();
80
-
86
+
81
87
// Check blocks
82
88
ClusterBlockException blockException = checkBlocks (state , request );
83
89
if (blockException != null ) {
@@ -88,14 +94,13 @@ protected void doExecute(Task task, RemoteStoreMetadataRequest request, ActionLi
88
94
// Resolve concrete indices
89
95
String [] concreteIndices = indexNameExpressionResolver .concreteIndexNames (state , request );
90
96
if (concreteIndices .length == 0 ) {
91
- listener .onResponse (new RemoteStoreMetadataResponse (
92
- new RemoteStoreMetadata [0 ], 0 , 0 , 0 , Collections .emptyList ()));
97
+ listener .onResponse (new RemoteStoreMetadataResponse (new RemoteStoreMetadata [0 ], 0 , 0 , 0 , Collections .emptyList ()));
93
98
return ;
94
99
}
95
100
96
101
// Get relevant shards
97
102
List <ShardRouting > selectedShards = getSelectedShards (state , request , concreteIndices );
98
-
103
+
99
104
// Process each shard
100
105
List <RemoteStoreMetadata > responses = new ArrayList <>();
101
106
AtomicInteger successfulShards = new AtomicInteger (0 );
@@ -109,10 +114,9 @@ protected void doExecute(Task task, RemoteStoreMetadataRequest request, ActionLi
109
114
successfulShards .incrementAndGet ();
110
115
} catch (Exception e ) {
111
116
failedShards .incrementAndGet ();
112
- shardFailures .add (new DefaultShardOperationFailedException (
113
- shardRouting .shardId ().getIndexName (),
114
- shardRouting .shardId ().getId (),
115
- e ));
117
+ shardFailures .add (
118
+ new DefaultShardOperationFailedException (shardRouting .shardId ().getIndexName (), shardRouting .shardId ().getId (), e )
119
+ );
116
120
}
117
121
}
118
122
@@ -123,7 +127,7 @@ protected void doExecute(Task task, RemoteStoreMetadataRequest request, ActionLi
123
127
failedShards .get (),
124
128
shardFailures
125
129
);
126
-
130
+
127
131
listener .onResponse (response );
128
132
129
133
} catch (Exception e ) {
@@ -141,22 +145,23 @@ private ClusterBlockException checkBlocks(ClusterState state, RemoteStoreMetadat
141
145
return state .blocks ().indicesBlockedException (ClusterBlockLevel .METADATA_READ , concreteIndices );
142
146
}
143
147
144
- private List <ShardRouting > getSelectedShards (
145
- ClusterState clusterState ,
146
- RemoteStoreMetadataRequest request ,
147
- String [] concreteIndices
148
- ) {
148
+ private List <ShardRouting > getSelectedShards (ClusterState clusterState , RemoteStoreMetadataRequest request , String [] concreteIndices ) {
149
149
return clusterState .routingTable ()
150
150
.allShards (concreteIndices )
151
151
.getShardRoutings ()
152
152
.stream ()
153
- .filter (shardRouting -> request .shards ().length == 0 ||
154
- Arrays .asList (request .shards ()).contains (Integer .toString (shardRouting .shardId ().id ())))
155
- .filter (shardRouting -> !request .local () ||
156
- Objects .equals (shardRouting .currentNodeId (), clusterState .getNodes ().getLocalNodeId ()))
157
- .filter (shardRouting -> Boolean .parseBoolean (
158
- clusterState .getMetadata ().index (shardRouting .index ())
159
- .getSettings ().get (IndexMetadata .SETTING_REMOTE_STORE_ENABLED )))
153
+ .filter (
154
+ shardRouting -> request .shards ().length == 0
155
+ || Arrays .asList (request .shards ()).contains (Integer .toString (shardRouting .shardId ().id ()))
156
+ )
157
+ .filter (
158
+ shardRouting -> !request .local () || Objects .equals (shardRouting .currentNodeId (), clusterState .getNodes ().getLocalNodeId ())
159
+ )
160
+ .filter (
161
+ shardRouting -> Boolean .parseBoolean (
162
+ clusterState .getMetadata ().index (shardRouting .index ()).getSettings ().get (IndexMetadata .SETTING_REMOTE_STORE_ENABLED )
163
+ )
164
+ )
160
165
.collect (Collectors .toList ());
161
166
}
162
167
@@ -168,9 +173,7 @@ private RemoteStoreMetadata getShardMetadata(ShardRouting shardRouting) throws I
168
173
throw new ShardNotFoundException (indexShard .shardId ());
169
174
}
170
175
171
- String repoLocation = clusterService .localNode ()
172
- .getAttributes ()
173
- .get ("remote_store.repository.my-repository.settings.location" );
176
+ String repoLocation = clusterService .localNode ().getAttributes ().get ("remote_store.repository.my-repository.settings.location" );
174
177
175
178
String indexUUID = shardRouting .index ().getUUID ();
176
179
int shardId = shardRouting .shardId ().id ();
@@ -191,8 +194,10 @@ private Map<String, Object> readSegmentMetadata(String metadataPath) throws IOEx
191
194
192
195
try (DirectoryStream <Path > stream = Files .newDirectoryStream (path , "metadata__*" )) {
193
196
for (Path metadataFile : stream ) {
194
- try (InputStream in = Files .newInputStream (metadataFile );
195
- IndexInput idxIn = new ByteArrayIndexInput (metadataFile .getFileName ().toString (), in .readAllBytes ())) {
197
+ try (
198
+ InputStream in = Files .newInputStream (metadataFile );
199
+ IndexInput idxIn = new ByteArrayIndexInput (metadataFile .getFileName ().toString (), in .readAllBytes ())
200
+ ) {
196
201
197
202
RemoteSegmentMetadata segMetadata = metadataStreamWrapper .readStream (idxIn );
198
203
Map <String , Object > fileMetadata = new HashMap <>();
@@ -212,15 +217,25 @@ private Map<String, Object> readSegmentMetadata(String metadataPath) throws IOEx
212
217
213
218
if (segMetadata .getReplicationCheckpoint () != null ) {
214
219
var cp = segMetadata .getReplicationCheckpoint ();
215
- fileMetadata .put ("replication_checkpoint" , Map .of (
216
- "shard_id" , cp .getShardId ().toString (),
217
- "primary_term" , cp .getPrimaryTerm (),
218
- "generation" , cp .getSegmentsGen (),
219
- "version" , cp .getSegmentInfosVersion (),
220
- "length" , cp .getLength (),
221
- "codec" , cp .getCodec (),
222
- "created_timestamp" , cp .getCreatedTimeStamp ()
223
- ));
220
+ fileMetadata .put (
221
+ "replication_checkpoint" ,
222
+ Map .of (
223
+ "shard_id" ,
224
+ cp .getShardId ().toString (),
225
+ "primary_term" ,
226
+ cp .getPrimaryTerm (),
227
+ "generation" ,
228
+ cp .getSegmentsGen (),
229
+ "version" ,
230
+ cp .getSegmentInfosVersion (),
231
+ "length" ,
232
+ cp .getLength (),
233
+ "codec" ,
234
+ cp .getCodec (),
235
+ "created_timestamp" ,
236
+ cp .getCreatedTimeStamp ()
237
+ )
238
+ );
224
239
}
225
240
226
241
fileMetadata .put ("generation" , segMetadata .getGeneration ());
@@ -239,8 +254,10 @@ private Map<String, Object> readTranslogMetadata(String metadataPath) throws IOE
239
254
240
255
try (DirectoryStream <Path > stream = Files .newDirectoryStream (path , "metadata__*" )) {
241
256
for (Path metadataFile : stream ) {
242
- try (InputStream inputStream = Files .newInputStream (metadataFile );
243
- BytesStreamInput input = new BytesStreamInput (inputStream .readAllBytes ())) {
257
+ try (
258
+ InputStream inputStream = Files .newInputStream (metadataFile );
259
+ BytesStreamInput input = new BytesStreamInput (inputStream .readAllBytes ())
260
+ ) {
244
261
245
262
Map <String , Object > fileMetadata = new HashMap <>();
246
263
String [] parts = metadataFile .getFileName ().toString ().split (TranslogTransferMetadata .METADATA_SEPARATOR );
@@ -256,16 +273,23 @@ private Map<String, Object> readTranslogMetadata(String metadataPath) throws IOE
256
273
long generation = input .readLong ();
257
274
long minTranslogGen = input .readLong ();
258
275
Map <String , String > genToTermMap = input .readMap (StreamInput ::readString , StreamInput ::readString );
259
- fileMetadata .put ("content" , Map .of (
260
- "primary_term" , primaryTerm ,
261
- "generation" , generation ,
262
- "min_translog_generation" , minTranslogGen ,
263
- "generation_to_term_mapping" , genToTermMap
264
- ));
276
+ fileMetadata .put (
277
+ "content" ,
278
+ Map .of (
279
+ "primary_term" ,
280
+ primaryTerm ,
281
+ "generation" ,
282
+ generation ,
283
+ "min_translog_generation" ,
284
+ minTranslogGen ,
285
+ "generation_to_term_mapping" ,
286
+ genToTermMap
287
+ )
288
+ );
265
289
metadata .put (metadataFile .getFileName ().toString (), fileMetadata );
266
290
}
267
291
}
268
292
}
269
293
return metadata ;
270
294
}
271
- }
295
+ }
0 commit comments