@@ -89,8 +89,7 @@ public class DiskThresholdMonitor {
89
89
private final Supplier <Double > dataToFileCacheSizeRatioSupplier ;
90
90
private final LongSupplier currentTimeMillisSupplier ;
91
91
private final RerouteService rerouteService ;
92
- private final HotNodeDiskThresholdEvaluator hotNodeEvaluator ;
93
- private final WarmNodeDiskThresholdEvaluator warmNodeEvaluator ;
92
+ private final NodeDiskEvaluator nodeDiskEvaluator ;
94
93
private final AtomicLong lastRunTimeMillis = new AtomicLong (Long .MIN_VALUE );
95
94
private final AtomicBoolean checkInProgress = new AtomicBoolean ();
96
95
@@ -126,8 +125,12 @@ public DiskThresholdMonitor(
126
125
this .rerouteService = rerouteService ;
127
126
this .diskThresholdSettings = new DiskThresholdSettings (settings , clusterSettings );
128
127
this .client = client ;
129
- this .hotNodeEvaluator = new HotNodeDiskThresholdEvaluator (diskThresholdSettings );
130
- this .warmNodeEvaluator = new WarmNodeDiskThresholdEvaluator (diskThresholdSettings , dataToFileCacheSizeRatioSupplier );
128
+ DiskThresholdEvaluator hotNodeEvaluator = new HotNodeDiskThresholdEvaluator (diskThresholdSettings );
129
+ DiskThresholdEvaluator warmNodeEvaluator = new WarmNodeDiskThresholdEvaluator (
130
+ diskThresholdSettings ,
131
+ dataToFileCacheSizeRatioSupplier
132
+ );
133
+ this .nodeDiskEvaluator = new NodeDiskEvaluator (hotNodeEvaluator , warmNodeEvaluator );
131
134
this .dataToFileCacheSizeRatioSupplier = dataToFileCacheSizeRatioSupplier ;
132
135
}
133
136
@@ -187,7 +190,7 @@ public void onNewInfo(ClusterInfo info) {
187
190
usage = getWarmDiskUsage (usage , info , routingNode , state );
188
191
}
189
192
190
- if (isNodeExceedingFloodStageWatermark (usage , isWarmNode )) {
193
+ if (nodeDiskEvaluator . isNodeExceedingFloodStageWatermark (usage , isWarmNode )) {
191
194
192
195
nodesOverLowThreshold .add (node );
193
196
nodesOverHighThreshold .add (node );
@@ -210,7 +213,7 @@ public void onNewInfo(ClusterInfo info) {
210
213
continue ;
211
214
}
212
215
213
- if (isNodeExceedingHighWatermark (usage , isWarmNode )) {
216
+ if (nodeDiskEvaluator . isNodeExceedingHighWatermark (usage , isWarmNode )) {
214
217
215
218
if (routingNode != null ) { // might be temporarily null if the ClusterInfoService and the ClusterService are out of step
216
219
for (ShardRouting routing : routingNode ) {
@@ -229,7 +232,7 @@ public void onNewInfo(ClusterInfo info) {
229
232
Math .max (0L , usage .getFreeBytes () - reservedSpace )
230
233
);
231
234
232
- if (isNodeExceedingHighWatermark (usageWithReservedSpace , isWarmNode )) {
235
+ if (nodeDiskEvaluator . isNodeExceedingHighWatermark (usageWithReservedSpace , isWarmNode )) {
233
236
234
237
nodesOverLowThreshold .add (node );
235
238
nodesOverHighThreshold .add (node );
@@ -247,7 +250,7 @@ public void onNewInfo(ClusterInfo info) {
247
250
);
248
251
}
249
252
250
- } else if (isNodeExceedingLowWatermark (usage , isWarmNode )) {
253
+ } else if (nodeDiskEvaluator . isNodeExceedingLowWatermark (usage , isWarmNode )) {
251
254
252
255
nodesOverHighThresholdAndRelocating .remove (node );
253
256
@@ -328,7 +331,7 @@ public void onNewInfo(ClusterInfo info) {
328
331
}
329
332
330
333
boolean isNodeWarm = REMOTE_CAPABLE .equals (getNodePool (routingNode ));
331
- if (isNodeExceedingHighWatermark (usageIncludingRelocations , isNodeWarm )) {
334
+ if (nodeDiskEvaluator . isNodeExceedingHighWatermark (usageIncludingRelocations , isNodeWarm )) {
332
335
333
336
nodesOverHighThresholdAndRelocating .remove (diskUsage .getNodeId ());
334
337
logger .warn (
@@ -431,21 +434,6 @@ long sizeOfRelocatingShards(RoutingNode routingNode, DiskUsage diskUsage, Cluste
431
434
);
432
435
}
433
436
434
- private boolean isNodeExceedingFloodStageWatermark (DiskUsage diskUsage , boolean isWarmNode ) {
435
- DiskThresholdEvaluator evaluator = isWarmNode ? warmNodeEvaluator : hotNodeEvaluator ;
436
- return evaluator .isNodeExceedingFloodStageWatermark (diskUsage );
437
- }
438
-
439
- private boolean isNodeExceedingHighWatermark (DiskUsage diskUsage , boolean isWarmNode ) {
440
- DiskThresholdEvaluator evaluator = isWarmNode ? warmNodeEvaluator : hotNodeEvaluator ;
441
- return evaluator .isNodeExceedingHighWatermark (diskUsage );
442
- }
443
-
444
- private boolean isNodeExceedingLowWatermark (DiskUsage diskUsage , boolean isWarmNode ) {
445
- DiskThresholdEvaluator evaluator = isWarmNode ? warmNodeEvaluator : hotNodeEvaluator ;
446
- return evaluator .isNodeExceedingLowWatermark (diskUsage );
447
- }
448
-
449
437
private DiskUsage getWarmDiskUsage (DiskUsage diskUsage , ClusterInfo info , RoutingNode node , ClusterState state ) {
450
438
double dataToFileCacheSizeRatio = dataToFileCacheSizeRatioSupplier .get ();
451
439
AggregateFileCacheStats fileCacheStats = info .getNodeFileCacheStats ().getOrDefault (diskUsage .getNodeId (), null );
0 commit comments