Skip to content

Commit dc2db9f

Browse files
committed
Implement fixed interval refresh task scheduling
Signed-off-by: Ashish Singh <[email protected]>
1 parent 8312e42 commit dc2db9f

File tree

8 files changed

+200
-4
lines changed

8 files changed

+200
-4
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
1818
- Add ingestion management APIs for pause, resume and get ingestion state ([#17631](https://github.com/opensearch-project/OpenSearch/pull/17631))
1919
- [Security Manager Replacement] Enhance Java Agent to intercept System::exit ([#17746](https://github.com/opensearch-project/OpenSearch/pull/17746))
2020
- Support AutoExpand for SearchReplica ([#17741](https://github.com/opensearch-project/OpenSearch/pull/17741))
21+
- Implement fixed interval refresh task scheduling ([#17777](https://github.com/opensearch-project/OpenSearch/pull/17777))
2122

2223
### Changed
2324
- Migrate BC libs to their FIPS counterparts ([#14912](https://github.com/opensearch-project/OpenSearch/pull/14912))

server/src/main/java/org/opensearch/common/settings/ClusterSettings.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -820,7 +820,10 @@ public void apply(Settings value, Settings current, Settings previous) {
820820
),
821821
OpenSearchOnHeapCacheSettings.EXPIRE_AFTER_ACCESS_SETTING.getConcreteSettingForNamespace(
822822
CacheType.INDICES_REQUEST_CACHE.getSettingPrefix()
823-
)
823+
),
824+
825+
// Setting related to refresh optimisations
826+
IndicesService.CLUSTER_REFRESH_FIXED_INTERVAL_SCHEDULE_ENABLED_SETTING
824827
)
825828
)
826829
);

server/src/main/java/org/opensearch/common/util/concurrent/AbstractAsyncTask.java

Lines changed: 49 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,13 +33,15 @@
3333

3434
import org.apache.logging.log4j.Logger;
3535
import org.apache.logging.log4j.message.ParameterizedMessage;
36+
import org.opensearch.common.Randomness;
3637
import org.opensearch.common.unit.TimeValue;
3738
import org.opensearch.threadpool.Scheduler;
3839
import org.opensearch.threadpool.ThreadPool;
3940

4041
import java.io.Closeable;
4142
import java.util.Objects;
4243
import java.util.concurrent.atomic.AtomicBoolean;
44+
import java.util.function.Supplier;
4345

4446
/**
4547
* A base class for tasks that need to repeat.
@@ -56,17 +58,31 @@ public abstract class AbstractAsyncTask implements Runnable, Closeable {
5658
private volatile boolean isScheduledOrRunning;
5759
private volatile Exception lastThrownException;
5860
private volatile TimeValue interval;
61+
private volatile long lastRunStartTimeNs = -1;
62+
private final Supplier<Boolean> fixedIntervalSchedulingEnabled;
5963

6064
protected AbstractAsyncTask(Logger logger, ThreadPool threadPool, TimeValue interval, boolean autoReschedule) {
65+
this(logger, threadPool, interval, autoReschedule, () -> Boolean.FALSE);
66+
}
67+
68+
protected AbstractAsyncTask(
69+
Logger logger,
70+
ThreadPool threadPool,
71+
TimeValue interval,
72+
boolean autoReschedule,
73+
Supplier<Boolean> fixedIntervalSchedulingEnabled
74+
) {
6175
this.logger = logger;
6276
this.threadPool = threadPool;
6377
this.interval = interval;
6478
this.autoReschedule = autoReschedule;
79+
this.fixedIntervalSchedulingEnabled = fixedIntervalSchedulingEnabled;
6580
}
6681

6782
/**
6883
* Change the interval between runs.
6984
* If a future run is scheduled then this will reschedule it.
85+
*
7086
* @param interval The new interval between runs.
7187
*/
7288
public synchronized void setInterval(TimeValue interval) {
@@ -85,6 +101,7 @@ public TimeValue getInterval() {
85101
* should be scheduled. This method does *not* need to test if
86102
* the task is closed, as being closed automatically prevents
87103
* scheduling.
104+
*
88105
* @return Should the task be scheduled to run?
89106
*/
90107
protected abstract boolean mustReschedule();
@@ -106,7 +123,7 @@ public synchronized void rescheduleIfNecessary() {
106123
if (logger.isTraceEnabled()) {
107124
logger.trace("scheduling {} every {}", toString(), interval);
108125
}
109-
cancellable = threadPool.schedule(this, interval, getThreadPool());
126+
cancellable = threadPool.schedule(this, getSleepDuration(), getThreadPool());
110127
isScheduledOrRunning = true;
111128
} else {
112129
logger.trace("scheduled {} disabled", toString());
@@ -156,6 +173,7 @@ public final void run() {
156173
isScheduledOrRunning = autoReschedule;
157174
}
158175
try {
176+
lastRunStartTimeNs = System.nanoTime();
159177
runInternal();
160178
} catch (Exception ex) {
161179
if (lastThrownException == null || sameException(lastThrownException, ex) == false) {
@@ -203,4 +221,34 @@ private static boolean sameException(Exception left, Exception right) {
203221
protected String getThreadPool() {
204222
return ThreadPool.Names.SAME;
205223
}
224+
225+
/**
226+
* Calculates the sleep duration for the next scheduled execution of the task.
227+
* This method determines the appropriate delay based on the last run time and the configured interval
228+
* to schedule the next execution.
229+
*/
230+
public TimeValue getSleepDuration() {
231+
if (!fixedIntervalSchedulingEnabled.get()) {
232+
return interval;
233+
}
234+
235+
if (lastRunStartTimeNs == -1) {
236+
// We want to stagger the start of refreshes in random manner so that we avoid refreshes to happen at the same
237+
// when we have refreshes happening in parallel for multiple shards of the same index. a.k.a. Dense shard packing
238+
long sleepTimeNs = Randomness.get().nextLong(interval.nanos());
239+
return TimeValue.timeValueNanos(sleepTimeNs);
240+
}
241+
242+
long timeSinceLastRunNs = System.nanoTime() - lastRunStartTimeNs;
243+
if (timeSinceLastRunNs >= interval.nanos()) {
244+
// If the time taken for refresh is more than the configured refresh interval, then we schedule the next refresh
245+
// immediately. This is to avoid the case where the time taken for refresh is more than the configured refresh
246+
// interval due to the processing of the refresh request.
247+
return TimeValue.ZERO;
248+
} else {
249+
// If the time taken for refresh is less than the configured refresh interval, then we schedule the next refresh
250+
// after the remaining time for the refresh interval.
251+
return TimeValue.timeValueNanos(interval.nanos() - timeSinceLastRunNs);
252+
}
253+
}
206254
}

server/src/main/java/org/opensearch/index/IndexModule.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -630,6 +630,7 @@ public IndexService newIndexService(
630630
IndexStorePlugin.DirectoryFactory remoteDirectoryFactory,
631631
BiFunction<IndexSettings, ShardRouting, TranslogFactory> translogFactorySupplier,
632632
Supplier<TimeValue> clusterDefaultRefreshIntervalSupplier,
633+
Supplier<Boolean> fixedRefreshIntervalSchedulingEnabled,
633634
RecoverySettings recoverySettings,
634635
RemoteStoreSettings remoteStoreSettings
635636
) throws IOException {
@@ -653,6 +654,7 @@ public IndexService newIndexService(
653654
remoteDirectoryFactory,
654655
translogFactorySupplier,
655656
clusterDefaultRefreshIntervalSupplier,
657+
fixedRefreshIntervalSchedulingEnabled,
656658
recoverySettings,
657659
remoteStoreSettings,
658660
(s) -> {},
@@ -680,6 +682,7 @@ public IndexService newIndexService(
680682
IndexStorePlugin.DirectoryFactory remoteDirectoryFactory,
681683
BiFunction<IndexSettings, ShardRouting, TranslogFactory> translogFactorySupplier,
682684
Supplier<TimeValue> clusterDefaultRefreshIntervalSupplier,
685+
Supplier<Boolean> fixedRefreshIntervalSchedulingEnabled,
683686
RecoverySettings recoverySettings,
684687
RemoteStoreSettings remoteStoreSettings,
685688
Consumer<IndexShard> replicator,
@@ -741,6 +744,7 @@ public IndexService newIndexService(
741744
recoveryStateFactory,
742745
translogFactorySupplier,
743746
clusterDefaultRefreshIntervalSupplier,
747+
fixedRefreshIntervalSchedulingEnabled,
744748
recoverySettings,
745749
remoteStoreSettings,
746750
fileCache,

server/src/main/java/org/opensearch/index/IndexService.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -192,6 +192,7 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
192192
private final ValuesSourceRegistry valuesSourceRegistry;
193193
private final BiFunction<IndexSettings, ShardRouting, TranslogFactory> translogFactorySupplier;
194194
private final Supplier<TimeValue> clusterDefaultRefreshIntervalSupplier;
195+
private final Supplier<Boolean> fixedRefreshIntervalSchedulingEnabled;
195196
private final RecoverySettings recoverySettings;
196197
private final RemoteStoreSettings remoteStoreSettings;
197198
private final FileCache fileCache;
@@ -232,6 +233,7 @@ public IndexService(
232233
IndexStorePlugin.RecoveryStateFactory recoveryStateFactory,
233234
BiFunction<IndexSettings, ShardRouting, TranslogFactory> translogFactorySupplier,
234235
Supplier<TimeValue> clusterDefaultRefreshIntervalSupplier,
236+
Supplier<Boolean> fixedRefreshIntervalSchedulingEnabled,
235237
RecoverySettings recoverySettings,
236238
RemoteStoreSettings remoteStoreSettings,
237239
FileCache fileCache,
@@ -307,6 +309,7 @@ public IndexService(
307309
this.searchOperationListeners = Collections.unmodifiableList(searchOperationListeners);
308310
this.indexingOperationListeners = Collections.unmodifiableList(indexingOperationListeners);
309311
this.clusterDefaultRefreshIntervalSupplier = clusterDefaultRefreshIntervalSupplier;
312+
this.fixedRefreshIntervalSchedulingEnabled = fixedRefreshIntervalSchedulingEnabled;
310313
// kick off async ops for the first shard in this index
311314
this.refreshTask = new AsyncRefreshTask(this);
312315
this.trimTranslogTask = new AsyncTrimTranslogTask(this);
@@ -361,6 +364,7 @@ public IndexService(
361364
IndexStorePlugin.RecoveryStateFactory recoveryStateFactory,
362365
BiFunction<IndexSettings, ShardRouting, TranslogFactory> translogFactorySupplier,
363366
Supplier<TimeValue> clusterDefaultRefreshIntervalSupplier,
367+
Supplier<Boolean> fixedRefreshIntervalSchedulingEnabled,
364368
RecoverySettings recoverySettings,
365369
RemoteStoreSettings remoteStoreSettings
366370
) {
@@ -397,6 +401,7 @@ public IndexService(
397401
recoveryStateFactory,
398402
translogFactorySupplier,
399403
clusterDefaultRefreshIntervalSupplier,
404+
fixedRefreshIntervalSchedulingEnabled,
400405
recoverySettings,
401406
remoteStoreSettings,
402407
null,
@@ -1316,7 +1321,11 @@ abstract static class BaseAsyncTask extends AbstractAsyncTask {
13161321
protected final IndexService indexService;
13171322

13181323
BaseAsyncTask(final IndexService indexService, final TimeValue interval) {
1319-
super(indexService.logger, indexService.threadPool, interval, true);
1324+
this(indexService, interval, () -> Boolean.FALSE);
1325+
}
1326+
1327+
BaseAsyncTask(final IndexService indexService, final TimeValue interval, Supplier<Boolean> fixedIntervalSchedulingEnabled) {
1328+
super(indexService.logger, indexService.threadPool, interval, true, fixedIntervalSchedulingEnabled);
13201329
this.indexService = indexService;
13211330
rescheduleIfNecessary();
13221331
}
@@ -1366,7 +1375,7 @@ public String toString() {
13661375
final class AsyncRefreshTask extends BaseAsyncTask {
13671376

13681377
AsyncRefreshTask(IndexService indexService) {
1369-
super(indexService, indexService.getRefreshInterval());
1378+
super(indexService, indexService.getRefreshInterval(), fixedRefreshIntervalSchedulingEnabled);
13701379
}
13711380

13721381
@Override

server/src/main/java/org/opensearch/indices/IndicesService.java

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -290,6 +290,17 @@ public class IndicesService extends AbstractLifecycleComponent
290290
Property.Dynamic
291291
);
292292

293+
/**
294+
* This setting is used to enable fixed interval scheduling capability for refresh tasks to ensure consistent intervals
295+
* between refreshes.
296+
*/
297+
public static final Setting<Boolean> CLUSTER_REFRESH_FIXED_INTERVAL_SCHEDULE_ENABLED_SETTING = Setting.boolSetting(
298+
"cluster.index.refresh.fixed_interval_scheduling.enabled",
299+
false,
300+
Property.NodeScope,
301+
Property.Dynamic
302+
);
303+
293304
/**
294305
* This setting is used to restrict creation or updation of index where the `index.translog.durability` index setting
295306
* is set as ASYNC if enabled. If disabled, any of the durability mode can be used and switched at any later time from
@@ -363,6 +374,7 @@ public class IndicesService extends AbstractLifecycleComponent
363374
private final IndexStorePlugin.DirectoryFactory remoteDirectoryFactory;
364375
private final BiFunction<IndexSettings, ShardRouting, TranslogFactory> translogFactorySupplier;
365376
private volatile TimeValue clusterDefaultRefreshInterval;
377+
private volatile boolean fixedRefreshIntervalSchedulingEnabled;
366378
private final SearchRequestStats searchRequestStats;
367379
private final FileCache fileCache;
368380
private final CompositeIndexSettings compositeIndexSettings;
@@ -514,6 +526,15 @@ protected void closeInternal() {
514526
this.clusterDefaultRefreshInterval = CLUSTER_DEFAULT_INDEX_REFRESH_INTERVAL_SETTING.get(clusterService.getSettings());
515527
clusterService.getClusterSettings()
516528
.addSettingsUpdateConsumer(CLUSTER_DEFAULT_INDEX_REFRESH_INTERVAL_SETTING, this::onRefreshIntervalUpdate);
529+
this.fixedRefreshIntervalSchedulingEnabled = CLUSTER_REFRESH_FIXED_INTERVAL_SCHEDULE_ENABLED_SETTING.get(
530+
clusterService.getSettings()
531+
);
532+
clusterService.getClusterSettings()
533+
.addSettingsUpdateConsumer(
534+
CLUSTER_REFRESH_FIXED_INTERVAL_SCHEDULE_ENABLED_SETTING,
535+
this::setFixedRefreshIntervalSchedulingEnabled
536+
);
537+
517538
this.recoverySettings = recoverySettings;
518539
this.remoteStoreSettings = remoteStoreSettings;
519540
this.compositeIndexSettings = compositeIndexSettings;
@@ -1006,6 +1027,7 @@ private synchronized IndexService createIndexService(
10061027
remoteDirectoryFactory,
10071028
translogFactorySupplier,
10081029
this::getClusterDefaultRefreshInterval,
1030+
this::isFixedRefreshIntervalSchedulingEnabled,
10091031
this.recoverySettings,
10101032
this.remoteStoreSettings,
10111033
replicator,
@@ -2167,4 +2189,12 @@ public CompositeIndexSettings getCompositeIndexSettings() {
21672189
void setMaxSizeInRequestCache(Integer maxSizeInRequestCache) {
21682190
this.maxSizeInRequestCache = maxSizeInRequestCache;
21692191
}
2192+
2193+
public void setFixedRefreshIntervalSchedulingEnabled(boolean fixedRefreshIntervalSchedulingEnabled) {
2194+
this.fixedRefreshIntervalSchedulingEnabled = fixedRefreshIntervalSchedulingEnabled;
2195+
}
2196+
2197+
private boolean isFixedRefreshIntervalSchedulingEnabled() {
2198+
return fixedRefreshIntervalSchedulingEnabled;
2199+
}
21702200
}

server/src/test/java/org/opensearch/index/IndexModuleTests.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -263,6 +263,7 @@ private IndexService newIndexService(IndexModule module) throws IOException {
263263
new RemoteSegmentStoreDirectoryFactory(() -> repositoriesService, threadPool, ""),
264264
translogFactorySupplier,
265265
() -> IndexSettings.DEFAULT_REFRESH_INTERVAL,
266+
() -> Boolean.FALSE,
266267
DefaultRecoverySettings.INSTANCE,
267268
DefaultRemoteStoreSettings.INSTANCE,
268269
s -> {},

server/src/test/java/org/opensearch/index/IndexServiceTests.java

Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -636,6 +636,106 @@ public void testReplicationTask() throws Exception {
636636
assertEquals(1000, updatedTask.getInterval().millis());
637637
}
638638

639+
public void testBaseAsyncTaskWithFixedIntervalDisabled() throws Exception {
640+
IndexService indexService = createIndex("test", Settings.EMPTY);
641+
CountDownLatch latch = new CountDownLatch(1);
642+
try (
643+
IndexService.BaseAsyncTask task = new IndexService.BaseAsyncTask(
644+
indexService,
645+
TimeValue.timeValueSeconds(5),
646+
() -> Boolean.FALSE
647+
) {
648+
@Override
649+
protected void runInternal() {
650+
try {
651+
Thread.sleep(2000);
652+
latch.countDown();
653+
} catch (InterruptedException e) {
654+
throw new AssertionError(e);
655+
}
656+
}
657+
}
658+
) {
659+
// With refresh fixed interval disabled, the sleep duration is always the refresh interval
660+
long sleepDuration = task.getSleepDuration().seconds();
661+
assertEquals(5, sleepDuration);
662+
task.run();
663+
latch.await();
664+
sleepDuration = task.getSleepDuration().seconds();
665+
assertEquals(0, latch.getCount());
666+
indexService.close("test", false);
667+
assertEquals(5, sleepDuration);
668+
}
669+
}
670+
671+
public void testBaseAsyncTaskWithFixedIntervalEnabled() throws Exception {
672+
IndexService indexService = createIndex("test", Settings.EMPTY);
673+
CountDownLatch latch = new CountDownLatch(1);
674+
try (
675+
IndexService.BaseAsyncTask task = new IndexService.BaseAsyncTask(
676+
indexService,
677+
TimeValue.timeValueSeconds(5),
678+
() -> Boolean.TRUE
679+
) {
680+
@Override
681+
protected void runInternal() {
682+
try {
683+
Thread.sleep(2000);
684+
latch.countDown();
685+
} catch (InterruptedException e) {
686+
throw new AssertionError(e);
687+
}
688+
}
689+
}
690+
) {
691+
// In zero state, we have a random sleep duration
692+
long sleepDurationMs = task.getSleepDuration().millis();
693+
assertTrue(sleepDurationMs > 0);
694+
task.run();
695+
latch.await();
696+
// Since we have refresh taking up 2s, then the next refresh should have sleep duration of 3s. Here we check
697+
// the sleep duration to be non-zero since the sleep duration is calculated dynamically.
698+
sleepDurationMs = task.getSleepDuration().millis();
699+
assertTrue(sleepDurationMs > 0);
700+
assertEquals(0, latch.getCount());
701+
indexService.close("test", false);
702+
assertBusy(() -> { assertEquals(TimeValue.ZERO, task.getSleepDuration()); });
703+
}
704+
}
705+
706+
public void testBaseAsyncTaskWithFixedIntervalEnabledAndLongerRefresh() throws Exception {
707+
IndexService indexService = createIndex("test", Settings.EMPTY);
708+
CountDownLatch latch = new CountDownLatch(1);
709+
try (
710+
IndexService.BaseAsyncTask task = new IndexService.BaseAsyncTask(
711+
indexService,
712+
TimeValue.timeValueSeconds(1),
713+
() -> Boolean.TRUE
714+
) {
715+
@Override
716+
protected void runInternal() {
717+
try {
718+
Thread.sleep(2000);
719+
latch.countDown();
720+
} catch (InterruptedException e) {
721+
throw new AssertionError(e);
722+
}
723+
}
724+
}
725+
) {
726+
// In zero state, we have a random sleep duration
727+
long sleepDurationMs = task.getSleepDuration().millis();
728+
assertTrue(sleepDurationMs > 0);
729+
task.run();
730+
latch.await();
731+
indexService.close("test", false);
732+
// Since we have refresh taking up 2s and refresh interval as 1s, then the next refresh should happen immediately.
733+
sleepDurationMs = task.getSleepDuration().millis();
734+
assertEquals(0, sleepDurationMs);
735+
assertEquals(0, latch.getCount());
736+
}
737+
}
738+
639739
@Override
640740
protected Settings featureFlagSettings() {
641741
return Settings.builder()

0 commit comments

Comments
 (0)