Skip to content

Disabling _close API invocation during remote migration. #18327

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
May 26, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- [Security Manager Replacement] Enhance Java Agent to intercept newByteChannel ([#17989](https://github.com/opensearch-project/OpenSearch/pull/17989))
- Enabled Async Shard Batch Fetch by default ([#18139](https://github.com/opensearch-project/OpenSearch/pull/18139))
- Allow to get the search request from the QueryCoordinatorContext ([#17818](https://github.com/opensearch-project/OpenSearch/pull/17818))
- Reject close index requests, while remote store migration is in progress.([#18327](https://github.com/opensearch-project/OpenSearch/pull/18327))
- Improve sort-query performance by retaining the default `totalHitsThreshold` for approximated `match_all` queries ([#18189](https://github.com/opensearch-project/OpenSearch/pull/18189))
- Enable testing for ExtensiblePlugins using classpath plugins ([#16908](https://github.com/opensearch-project/OpenSearch/pull/16908))
- Introduce system generated ingest pipeline ([#17817](https://github.com/opensearch-project/OpenSearch/pull/17817)))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.remotemigration;

import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
import org.opensearch.action.admin.indices.close.CloseIndexRequest;
import org.opensearch.action.support.ActiveShardCount;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.metadata.MetadataIndexStateService;
import org.opensearch.common.settings.Settings;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.test.OpenSearchIntegTestCase;

import java.util.concurrent.ExecutionException;

import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REPLICATION_TYPE;
import static org.opensearch.node.remotestore.RemoteStoreNodeService.MIGRATION_DIRECTION_SETTING;
import static org.opensearch.node.remotestore.RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class CloseIndexMigrationTestCase extends MigrationBaseTestCase {
private static final String TEST_INDEX = "ind";
private final static String REMOTE_STORE_DIRECTION = "remote_store";
private final static String MIXED_MODE = "mixed";

/*
* This test will verify the close request failure, when cluster mode is mixed
* and migration to remote store is in progress.
* */
public void testFailCloseIndexWhileDocRepToRemoteStoreMigration() {
setAddRemote(false);
// create a docrep cluster
internalCluster().startClusterManagerOnlyNode();
internalCluster().validateClusterFormed();

// add a non-remote node
String nonRemoteNodeName = internalCluster().startDataOnlyNode();
internalCluster().validateClusterFormed();

// create index in cluster
Settings.Builder builder = Settings.builder().put(SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT);
internalCluster().client()
.admin()
.indices()
.prepareCreate(TEST_INDEX)
.setSettings(
builder.put("index.number_of_shards", 2)
.put("index.number_of_replicas", 0)
.put("index.routing.allocation.include._name", nonRemoteNodeName)
)
.setWaitForActiveShards(ActiveShardCount.ALL)
.execute()
.actionGet();

// set mixed mode
ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest();
updateSettingsRequest.persistentSettings(Settings.builder().put(REMOTE_STORE_COMPATIBILITY_MODE_SETTING.getKey(), MIXED_MODE));
assertAcked(internalCluster().client().admin().cluster().updateSettings(updateSettingsRequest).actionGet());

// add a remote node
addRemote = true;
internalCluster().startDataOnlyNode();
internalCluster().validateClusterFormed();

// set remote store migration direction
updateSettingsRequest.persistentSettings(Settings.builder().put(MIGRATION_DIRECTION_SETTING.getKey(), REMOTE_STORE_DIRECTION));
assertAcked(internalCluster().client().admin().cluster().updateSettings(updateSettingsRequest).actionGet());

ensureGreen(TEST_INDEX);

// Try closing the index, expecting failure.
ExecutionException ex = expectThrows(
ExecutionException.class,
() -> internalCluster().client().admin().indices().close(new CloseIndexRequest(TEST_INDEX)).get()

);
assertEquals("Cannot close index while remote migration is ongoing", ex.getCause().getMessage());
}

/*
* Verify that index closes if compatibility mode is MIXED, and direction is set to NONE
* */
public void testCloseIndexRequestWithMixedCompatibilityModeAndNoDirection() {
setAddRemote(false);
// create a docrep cluster
internalCluster().startClusterManagerOnlyNode();
internalCluster().validateClusterFormed();

// add a non-remote node
String nonRemoteNodeName = internalCluster().startDataOnlyNode();
internalCluster().validateClusterFormed();

// create index in cluster
Settings.Builder builder = Settings.builder().put(SETTING_REPLICATION_TYPE, ReplicationType.DOCUMENT);
internalCluster().client()
.admin()
.indices()
.prepareCreate(TEST_INDEX)
.setSettings(
builder.put("index.number_of_shards", 2)
.put("index.number_of_replicas", 0)
.put("index.routing.allocation.include._name", nonRemoteNodeName)
)
.setWaitForActiveShards(ActiveShardCount.ALL)
.execute()
.actionGet();

// set mixed mode
ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest();
updateSettingsRequest.persistentSettings(Settings.builder().put(REMOTE_STORE_COMPATIBILITY_MODE_SETTING.getKey(), MIXED_MODE));
assertAcked(internalCluster().client().admin().cluster().updateSettings(updateSettingsRequest).actionGet());

ensureGreen(TEST_INDEX);

// perform close action
assertAcked(internalCluster().client().admin().indices().close(new CloseIndexRequest(TEST_INDEX)).actionGet());

// verify that index has been closed
final ClusterState clusterState = client().admin().cluster().prepareState().get().getState();

final IndexMetadata indexMetadata = clusterState.metadata().indices().get(TEST_INDEX);
assertEquals(IndexMetadata.State.CLOSE, indexMetadata.getState());
final Settings indexSettings = indexMetadata.getSettings();
assertTrue(indexSettings.hasValue(MetadataIndexStateService.VERIFIED_BEFORE_CLOSE_SETTING.getKey()));
assertEquals(true, indexSettings.getAsBoolean(MetadataIndexStateService.VERIFIED_BEFORE_CLOSE_SETTING.getKey(), false));
assertNotNull(clusterState.routingTable().index(TEST_INDEX));
assertTrue(clusterState.blocks().hasIndexBlock(TEST_INDEX, MetadataIndexStateService.INDEX_CLOSED_BLOCK));

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.index.Index;
import org.opensearch.node.remotestore.RemoteStoreNodeService;
import org.opensearch.node.remotestore.RemoteStoreNodeService.CompatibilityMode;
import org.opensearch.node.remotestore.RemoteStoreNodeService.Direction;
import org.opensearch.tasks.Task;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportService;
Expand Down Expand Up @@ -130,6 +133,7 @@ protected void doExecute(Task task, CloseIndexRequest request, ActionListener<Cl
+ ": true] to enable it. NOTE: closed indices still consume a significant amount of diskspace"
);
}
validateRemoteMigration();
super.doExecute(task, request, listener);
}

Expand Down Expand Up @@ -172,4 +176,17 @@ protected void clusterManagerOperation(
delegatedListener.onFailure(t);
}));
}

/**
* Reject close index request if cluster mode is [MIXED] and migration direction is [RemoteStore]
* @throws IllegalStateException if cluster mode is [MIXED] and migration direction is [RemoteStore]
*/
private void validateRemoteMigration() {
ClusterSettings clusterSettings = clusterService.getClusterSettings();
CompatibilityMode compatibilityMode = clusterSettings.get(RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING);
Direction migrationDirection = clusterSettings.get(RemoteStoreNodeService.MIGRATION_DIRECTION_SETTING);
if (compatibilityMode == CompatibilityMode.MIXED && migrationDirection == Direction.REMOTE_STORE) {
throw new IllegalStateException("Cannot close index while remote migration is ongoing");
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.action.admin.indices.close;

import org.opensearch.action.support.ActionFilters;
import org.opensearch.action.support.DestructiveOperations;
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
import org.opensearch.cluster.metadata.MetadataIndexStateService;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Settings;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.threadpool.TestThreadPool;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportService;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;

import java.util.concurrent.TimeUnit;

import static org.opensearch.node.remotestore.RemoteStoreNodeService.MIGRATION_DIRECTION_SETTING;
import static org.opensearch.node.remotestore.RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

public class TransportCloseIndexActionTests extends OpenSearchTestCase {
private static ThreadPool threadPool;
private ClusterService clusterService;
private final static String MIXED_MODE = "mixed";
private final static String REMOTE_STORE_DIRECTION = "remote_store";
private ClusterSettings clusterSettings;
private final static String TEST_IND = "ind";

@BeforeClass
public static void beforeClass() {
threadPool = new TestThreadPool(getTestClass().getName());
}

@Override
@Before
public void setUp() throws Exception {
super.setUp();

clusterService = mock(ClusterService.class);
Settings settings = Settings.builder()
.put(REMOTE_STORE_COMPATIBILITY_MODE_SETTING.getKey(), MIXED_MODE)
.put(MIGRATION_DIRECTION_SETTING.getKey(), REMOTE_STORE_DIRECTION)
.build();
ClusterSettings clusSet = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
when(clusterService.getClusterSettings()).thenReturn(clusSet);
clusterSettings = clusterService.getClusterSettings();
}

@Override
@After
public void tearDown() throws Exception {
super.tearDown();
clusterService.close();
}

@AfterClass
public static void afterClass() {
ThreadPool.terminate(threadPool, 30, TimeUnit.SECONDS);
threadPool = null;
}

private TransportCloseIndexAction createAction() {
return new TransportCloseIndexAction(
Settings.EMPTY,
mock(TransportService.class),
clusterService,
threadPool,
mock(MetadataIndexStateService.class),
clusterSettings,
mock(ActionFilters.class),
mock(IndexNameExpressionResolver.class),
new DestructiveOperations(Settings.EMPTY, clusterSettings)
);
}

// Test if validateRemoteMigration throws illegal exception when compatibility mode is MIXED and migration Direction is REMOTE_STORE
public void testRemoteValidation() {
TransportCloseIndexAction action = createAction();

Exception e = expectThrows(IllegalStateException.class, () -> action.doExecute(null, new CloseIndexRequest(TEST_IND), null));

assertEquals("Cannot close index while remote migration is ongoing", e.getMessage());
}
}
Loading