Skip to content

Implement fixed interval refresh task scheduling #17777

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add ingestion management APIs for pause, resume and get ingestion state ([#17631](https://github.com/opensearch-project/OpenSearch/pull/17631))
- [Security Manager Replacement] Enhance Java Agent to intercept System::exit ([#17746](https://github.com/opensearch-project/OpenSearch/pull/17746))
- Support AutoExpand for SearchReplica ([#17741](https://github.com/opensearch-project/OpenSearch/pull/17741))
- Implement fixed interval refresh task scheduling ([#17777](https://github.com/opensearch-project/OpenSearch/pull/17777))

### Changed
- Migrate BC libs to their FIPS counterparts ([#14912](https://github.com/opensearch-project/OpenSearch/pull/14912))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -820,7 +820,10 @@ public void apply(Settings value, Settings current, Settings previous) {
),
OpenSearchOnHeapCacheSettings.EXPIRE_AFTER_ACCESS_SETTING.getConcreteSettingForNamespace(
CacheType.INDICES_REQUEST_CACHE.getSettingPrefix()
)
),

// Setting related to refresh optimisations
IndicesService.CLUSTER_REFRESH_FIXED_INTERVAL_SCHEDULE_ENABLED_SETTING
)
)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,15 @@

import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.opensearch.common.Randomness;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.threadpool.Scheduler;
import org.opensearch.threadpool.ThreadPool;

import java.io.Closeable;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;

/**
* A base class for tasks that need to repeat.
Expand All @@ -56,17 +58,31 @@ public abstract class AbstractAsyncTask implements Runnable, Closeable {
private volatile boolean isScheduledOrRunning;
private volatile Exception lastThrownException;
private volatile TimeValue interval;
private volatile long lastRunStartTimeNs = -1;
private final Supplier<Boolean> fixedIntervalSchedulingEnabled;

protected AbstractAsyncTask(Logger logger, ThreadPool threadPool, TimeValue interval, boolean autoReschedule) {
this(logger, threadPool, interval, autoReschedule, () -> Boolean.FALSE);
}

protected AbstractAsyncTask(
Logger logger,
ThreadPool threadPool,
TimeValue interval,
boolean autoReschedule,
Supplier<Boolean> fixedIntervalSchedulingEnabled
) {
this.logger = logger;
this.threadPool = threadPool;
this.interval = interval;
this.autoReschedule = autoReschedule;
this.fixedIntervalSchedulingEnabled = fixedIntervalSchedulingEnabled;
}

/**
* Change the interval between runs.
* If a future run is scheduled then this will reschedule it.
*
* @param interval The new interval between runs.
*/
public synchronized void setInterval(TimeValue interval) {
Expand All @@ -85,6 +101,7 @@ public TimeValue getInterval() {
* should be scheduled. This method does *not* need to test if
* the task is closed, as being closed automatically prevents
* scheduling.
*
* @return Should the task be scheduled to run?
*/
protected abstract boolean mustReschedule();
Expand All @@ -106,7 +123,7 @@ public synchronized void rescheduleIfNecessary() {
if (logger.isTraceEnabled()) {
logger.trace("scheduling {} every {}", toString(), interval);
}
cancellable = threadPool.schedule(this, interval, getThreadPool());
cancellable = threadPool.schedule(this, getSleepDuration(), getThreadPool());
isScheduledOrRunning = true;
} else {
logger.trace("scheduled {} disabled", toString());
Expand Down Expand Up @@ -156,6 +173,7 @@ public final void run() {
isScheduledOrRunning = autoReschedule;
}
try {
lastRunStartTimeNs = System.nanoTime();
runInternal();
} catch (Exception ex) {
if (lastThrownException == null || sameException(lastThrownException, ex) == false) {
Expand Down Expand Up @@ -203,4 +221,34 @@ private static boolean sameException(Exception left, Exception right) {
protected String getThreadPool() {
return ThreadPool.Names.SAME;
}

/**
* Calculates the sleep duration for the next scheduled execution of the task.
* This method determines the appropriate delay based on the last run time and the configured interval
* to schedule the next execution.
*/
public TimeValue getSleepDuration() {
if (!fixedIntervalSchedulingEnabled.get()) {
return interval;
}

if (lastRunStartTimeNs == -1) {
// We want to stagger the start of refreshes in random manner so that we avoid refreshes to happen at the same
// when we have refreshes happening in parallel for multiple shards of the same index. a.k.a. Dense shard packing
long sleepTimeNs = Randomness.get().nextLong(interval.nanos());
return TimeValue.timeValueNanos(sleepTimeNs);
}

long timeSinceLastRunNs = System.nanoTime() - lastRunStartTimeNs;
if (timeSinceLastRunNs >= interval.nanos()) {
// If the time taken for refresh is more than the configured refresh interval, then we schedule the next refresh
// immediately. This is to avoid the case where the time taken for refresh is more than the configured refresh
// interval due to the processing of the refresh request.
return TimeValue.ZERO;
} else {
// If the time taken for refresh is less than the configured refresh interval, then we schedule the next refresh
// after the remaining time for the refresh interval.
return TimeValue.timeValueNanos(interval.nanos() - timeSinceLastRunNs);
}
}
}
4 changes: 4 additions & 0 deletions server/src/main/java/org/opensearch/index/IndexModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -630,6 +630,7 @@ public IndexService newIndexService(
IndexStorePlugin.DirectoryFactory remoteDirectoryFactory,
BiFunction<IndexSettings, ShardRouting, TranslogFactory> translogFactorySupplier,
Supplier<TimeValue> clusterDefaultRefreshIntervalSupplier,
Supplier<Boolean> fixedRefreshIntervalSchedulingEnabled,
RecoverySettings recoverySettings,
RemoteStoreSettings remoteStoreSettings
) throws IOException {
Expand All @@ -653,6 +654,7 @@ public IndexService newIndexService(
remoteDirectoryFactory,
translogFactorySupplier,
clusterDefaultRefreshIntervalSupplier,
fixedRefreshIntervalSchedulingEnabled,
recoverySettings,
remoteStoreSettings,
(s) -> {},
Expand Down Expand Up @@ -680,6 +682,7 @@ public IndexService newIndexService(
IndexStorePlugin.DirectoryFactory remoteDirectoryFactory,
BiFunction<IndexSettings, ShardRouting, TranslogFactory> translogFactorySupplier,
Supplier<TimeValue> clusterDefaultRefreshIntervalSupplier,
Supplier<Boolean> fixedRefreshIntervalSchedulingEnabled,
RecoverySettings recoverySettings,
RemoteStoreSettings remoteStoreSettings,
Consumer<IndexShard> replicator,
Expand Down Expand Up @@ -741,6 +744,7 @@ public IndexService newIndexService(
recoveryStateFactory,
translogFactorySupplier,
clusterDefaultRefreshIntervalSupplier,
fixedRefreshIntervalSchedulingEnabled,
recoverySettings,
remoteStoreSettings,
fileCache,
Expand Down
13 changes: 11 additions & 2 deletions server/src/main/java/org/opensearch/index/IndexService.java
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,7 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
private final ValuesSourceRegistry valuesSourceRegistry;
private final BiFunction<IndexSettings, ShardRouting, TranslogFactory> translogFactorySupplier;
private final Supplier<TimeValue> clusterDefaultRefreshIntervalSupplier;
private final Supplier<Boolean> fixedRefreshIntervalSchedulingEnabled;
private final RecoverySettings recoverySettings;
private final RemoteStoreSettings remoteStoreSettings;
private final FileCache fileCache;
Expand Down Expand Up @@ -232,6 +233,7 @@ public IndexService(
IndexStorePlugin.RecoveryStateFactory recoveryStateFactory,
BiFunction<IndexSettings, ShardRouting, TranslogFactory> translogFactorySupplier,
Supplier<TimeValue> clusterDefaultRefreshIntervalSupplier,
Supplier<Boolean> fixedRefreshIntervalSchedulingEnabled,
RecoverySettings recoverySettings,
RemoteStoreSettings remoteStoreSettings,
FileCache fileCache,
Expand Down Expand Up @@ -307,6 +309,7 @@ public IndexService(
this.searchOperationListeners = Collections.unmodifiableList(searchOperationListeners);
this.indexingOperationListeners = Collections.unmodifiableList(indexingOperationListeners);
this.clusterDefaultRefreshIntervalSupplier = clusterDefaultRefreshIntervalSupplier;
this.fixedRefreshIntervalSchedulingEnabled = fixedRefreshIntervalSchedulingEnabled;
// kick off async ops for the first shard in this index
this.refreshTask = new AsyncRefreshTask(this);
this.trimTranslogTask = new AsyncTrimTranslogTask(this);
Expand Down Expand Up @@ -361,6 +364,7 @@ public IndexService(
IndexStorePlugin.RecoveryStateFactory recoveryStateFactory,
BiFunction<IndexSettings, ShardRouting, TranslogFactory> translogFactorySupplier,
Supplier<TimeValue> clusterDefaultRefreshIntervalSupplier,
Supplier<Boolean> fixedRefreshIntervalSchedulingEnabled,
RecoverySettings recoverySettings,
RemoteStoreSettings remoteStoreSettings
) {
Expand Down Expand Up @@ -397,6 +401,7 @@ public IndexService(
recoveryStateFactory,
translogFactorySupplier,
clusterDefaultRefreshIntervalSupplier,
fixedRefreshIntervalSchedulingEnabled,
recoverySettings,
remoteStoreSettings,
null,
Expand Down Expand Up @@ -1316,7 +1321,11 @@ abstract static class BaseAsyncTask extends AbstractAsyncTask {
protected final IndexService indexService;

BaseAsyncTask(final IndexService indexService, final TimeValue interval) {
super(indexService.logger, indexService.threadPool, interval, true);
this(indexService, interval, () -> Boolean.FALSE);
}

BaseAsyncTask(final IndexService indexService, final TimeValue interval, Supplier<Boolean> fixedIntervalSchedulingEnabled) {
super(indexService.logger, indexService.threadPool, interval, true, fixedIntervalSchedulingEnabled);
this.indexService = indexService;
rescheduleIfNecessary();
}
Expand Down Expand Up @@ -1366,7 +1375,7 @@ public String toString() {
final class AsyncRefreshTask extends BaseAsyncTask {

AsyncRefreshTask(IndexService indexService) {
super(indexService, indexService.getRefreshInterval());
super(indexService, indexService.getRefreshInterval(), fixedRefreshIntervalSchedulingEnabled);
}

@Override
Expand Down
30 changes: 30 additions & 0 deletions server/src/main/java/org/opensearch/indices/IndicesService.java
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,17 @@
Property.Dynamic
);

/**
* This setting is used to enable fixed interval scheduling capability for refresh tasks to ensure consistent intervals
* between refreshes.
*/
public static final Setting<Boolean> CLUSTER_REFRESH_FIXED_INTERVAL_SCHEDULE_ENABLED_SETTING = Setting.boolSetting(
"cluster.index.refresh.fixed_interval_scheduling.enabled",
false,
Property.NodeScope,
Property.Dynamic
);

/**
* This setting is used to restrict creation or updation of index where the `index.translog.durability` index setting
* is set as ASYNC if enabled. If disabled, any of the durability mode can be used and switched at any later time from
Expand Down Expand Up @@ -363,6 +374,7 @@
private final IndexStorePlugin.DirectoryFactory remoteDirectoryFactory;
private final BiFunction<IndexSettings, ShardRouting, TranslogFactory> translogFactorySupplier;
private volatile TimeValue clusterDefaultRefreshInterval;
private volatile boolean fixedRefreshIntervalSchedulingEnabled;
private final SearchRequestStats searchRequestStats;
private final FileCache fileCache;
private final CompositeIndexSettings compositeIndexSettings;
Expand Down Expand Up @@ -514,6 +526,15 @@
this.clusterDefaultRefreshInterval = CLUSTER_DEFAULT_INDEX_REFRESH_INTERVAL_SETTING.get(clusterService.getSettings());
clusterService.getClusterSettings()
.addSettingsUpdateConsumer(CLUSTER_DEFAULT_INDEX_REFRESH_INTERVAL_SETTING, this::onRefreshIntervalUpdate);
this.fixedRefreshIntervalSchedulingEnabled = CLUSTER_REFRESH_FIXED_INTERVAL_SCHEDULE_ENABLED_SETTING.get(
clusterService.getSettings()
);
clusterService.getClusterSettings()
.addSettingsUpdateConsumer(
CLUSTER_REFRESH_FIXED_INTERVAL_SCHEDULE_ENABLED_SETTING,
this::setFixedRefreshIntervalSchedulingEnabled
);

this.recoverySettings = recoverySettings;
this.remoteStoreSettings = remoteStoreSettings;
this.compositeIndexSettings = compositeIndexSettings;
Expand Down Expand Up @@ -1006,6 +1027,7 @@
remoteDirectoryFactory,
translogFactorySupplier,
this::getClusterDefaultRefreshInterval,
this::isFixedRefreshIntervalSchedulingEnabled,
this.recoverySettings,
this.remoteStoreSettings,
replicator,
Expand Down Expand Up @@ -2167,4 +2189,12 @@
void setMaxSizeInRequestCache(Integer maxSizeInRequestCache) {
this.maxSizeInRequestCache = maxSizeInRequestCache;
}

public void setFixedRefreshIntervalSchedulingEnabled(boolean fixedRefreshIntervalSchedulingEnabled) {
this.fixedRefreshIntervalSchedulingEnabled = fixedRefreshIntervalSchedulingEnabled;
}

Check warning on line 2195 in server/src/main/java/org/opensearch/indices/IndicesService.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/indices/IndicesService.java#L2194-L2195

Added lines #L2194 - L2195 were not covered by tests

private boolean isFixedRefreshIntervalSchedulingEnabled() {
return fixedRefreshIntervalSchedulingEnabled;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,7 @@ private IndexService newIndexService(IndexModule module) throws IOException {
new RemoteSegmentStoreDirectoryFactory(() -> repositoriesService, threadPool, ""),
translogFactorySupplier,
() -> IndexSettings.DEFAULT_REFRESH_INTERVAL,
() -> Boolean.FALSE,
DefaultRecoverySettings.INSTANCE,
DefaultRemoteStoreSettings.INSTANCE,
s -> {},
Expand Down
100 changes: 100 additions & 0 deletions server/src/test/java/org/opensearch/index/IndexServiceTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -636,6 +636,106 @@ public void testReplicationTask() throws Exception {
assertEquals(1000, updatedTask.getInterval().millis());
}

public void testBaseAsyncTaskWithFixedIntervalDisabled() throws Exception {
IndexService indexService = createIndex("test", Settings.EMPTY);
CountDownLatch latch = new CountDownLatch(1);
try (
IndexService.BaseAsyncTask task = new IndexService.BaseAsyncTask(
indexService,
TimeValue.timeValueSeconds(5),
() -> Boolean.FALSE
) {
@Override
protected void runInternal() {
try {
Thread.sleep(2000);
latch.countDown();
} catch (InterruptedException e) {
throw new AssertionError(e);
}
}
}
) {
// With refresh fixed interval disabled, the sleep duration is always the refresh interval
long sleepDuration = task.getSleepDuration().seconds();
assertEquals(5, sleepDuration);
task.run();
latch.await();
sleepDuration = task.getSleepDuration().seconds();
assertEquals(0, latch.getCount());
indexService.close("test", false);
assertEquals(5, sleepDuration);
}
}

public void testBaseAsyncTaskWithFixedIntervalEnabled() throws Exception {
IndexService indexService = createIndex("test", Settings.EMPTY);
CountDownLatch latch = new CountDownLatch(1);
try (
IndexService.BaseAsyncTask task = new IndexService.BaseAsyncTask(
indexService,
TimeValue.timeValueSeconds(5),
() -> Boolean.TRUE
) {
@Override
protected void runInternal() {
try {
Thread.sleep(2000);
latch.countDown();
} catch (InterruptedException e) {
throw new AssertionError(e);
}
}
}
) {
// In zero state, we have a random sleep duration
long sleepDurationMs = task.getSleepDuration().millis();
assertTrue(sleepDurationMs > 0);
task.run();
latch.await();
// Since we have refresh taking up 2s, then the next refresh should have sleep duration of 3s. Here we check
// the sleep duration to be non-zero since the sleep duration is calculated dynamically.
sleepDurationMs = task.getSleepDuration().millis();
assertTrue(sleepDurationMs > 0);
assertEquals(0, latch.getCount());
indexService.close("test", false);
assertBusy(() -> { assertEquals(TimeValue.ZERO, task.getSleepDuration()); });
}
}

public void testBaseAsyncTaskWithFixedIntervalEnabledAndLongerRefresh() throws Exception {
IndexService indexService = createIndex("test", Settings.EMPTY);
CountDownLatch latch = new CountDownLatch(1);
try (
IndexService.BaseAsyncTask task = new IndexService.BaseAsyncTask(
indexService,
TimeValue.timeValueSeconds(1),
() -> Boolean.TRUE
) {
@Override
protected void runInternal() {
try {
Thread.sleep(2000);
latch.countDown();
} catch (InterruptedException e) {
throw new AssertionError(e);
}
}
}
) {
// In zero state, we have a random sleep duration
long sleepDurationMs = task.getSleepDuration().millis();
assertTrue(sleepDurationMs > 0);
task.run();
latch.await();
indexService.close("test", false);
// Since we have refresh taking up 2s and refresh interval as 1s, then the next refresh should happen immediately.
sleepDurationMs = task.getSleepDuration().millis();
assertEquals(0, sleepDurationMs);
assertEquals(0, latch.getCount());
}
}

@Override
protected Settings featureFlagSettings() {
return Settings.builder()
Expand Down
Loading