Skip to content

Commit c2fcf39

Browse files
committed
change create rule api to run only on master node to avoid race condition
Signed-off-by: Ruirui Zhang <[email protected]>
1 parent ec2eed9 commit c2fcf39

File tree

10 files changed

+138
-72
lines changed

10 files changed

+138
-72
lines changed

modules/autotagging-commons/common/src/main/java/org/opensearch/rule/CreateRuleRequest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,8 @@
88

99
package org.opensearch.rule;
1010

11-
import org.opensearch.action.ActionRequest;
1211
import org.opensearch.action.ActionRequestValidationException;
12+
import org.opensearch.action.support.clustermanager.ClusterManagerNodeRequest;
1313
import org.opensearch.core.common.io.stream.StreamInput;
1414
import org.opensearch.core.common.io.stream.StreamOutput;
1515
import org.opensearch.rule.autotagging.Rule;
@@ -28,7 +28,7 @@
2828
* }'
2929
* @opensearch.experimental
3030
*/
31-
public class CreateRuleRequest extends ActionRequest {
31+
public class CreateRuleRequest extends ClusterManagerNodeRequest<CreateRuleRequest> {
3232
private final Rule rule;
3333

3434
/**

modules/autotagging-commons/common/src/test/java/org/opensearch/rule/autotagging/AutoTaggingRegistryTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,9 @@
1515

1616
import static org.opensearch.rule.autotagging.AutoTaggingRegistry.MAX_FEATURE_TYPE_NAME_LENGTH;
1717
import static org.opensearch.rule.autotagging.RuleTests.INVALID_FEATURE;
18+
import static org.opensearch.rule.utils.RuleTestUtils.FEATURE_TYPE_NAME;
1819
import static org.mockito.Mockito.mock;
1920
import static org.mockito.Mockito.when;
20-
import static org.opensearch.rule.utils.RuleTestUtils.FEATURE_TYPE_NAME;
2121

2222
public class AutoTaggingRegistryTests extends OpenSearchTestCase {
2323

modules/autotagging-commons/src/main/java/org/opensearch/rule/action/TransportCreateRuleAction.java

Lines changed: 45 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,46 +9,86 @@
99
package org.opensearch.rule.action;
1010

1111
import org.opensearch.action.support.ActionFilters;
12-
import org.opensearch.action.support.HandledTransportAction;
12+
import org.opensearch.action.support.clustermanager.TransportClusterManagerNodeAction;
13+
import org.opensearch.cluster.ClusterState;
14+
import org.opensearch.cluster.block.ClusterBlockException;
15+
import org.opensearch.cluster.block.ClusterBlockLevel;
16+
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
17+
import org.opensearch.cluster.service.ClusterService;
1318
import org.opensearch.common.inject.Inject;
1419
import org.opensearch.core.action.ActionListener;
20+
import org.opensearch.core.common.io.stream.StreamInput;
1521
import org.opensearch.rule.CreateRuleRequest;
1622
import org.opensearch.rule.CreateRuleResponse;
1723
import org.opensearch.rule.RulePersistenceService;
1824
import org.opensearch.rule.RulePersistenceServiceRegistry;
19-
import org.opensearch.tasks.Task;
25+
import org.opensearch.threadpool.ThreadPool;
2026
import org.opensearch.transport.TransportService;
2127

28+
import java.io.IOException;
29+
30+
import static org.opensearch.threadpool.ThreadPool.Names.SAME;
31+
2232
/**
2333
* Transport action to create Rules
2434
* @opensearch.experimental
2535
*/
26-
public class TransportCreateRuleAction extends HandledTransportAction<CreateRuleRequest, CreateRuleResponse> {
36+
public class TransportCreateRuleAction extends TransportClusterManagerNodeAction<CreateRuleRequest, CreateRuleResponse> {
2737

2838
private final RulePersistenceServiceRegistry rulePersistenceServiceRegistry;
2939

3040
/**
3141
* Constructor for TransportCreateRuleAction
3242
*
43+
* @param threadPool - {@link ThreadPool} object
3344
* @param transportService - a {@link TransportService} object
45+
* @param clusterService - a {@link ClusterService} object
3446
* @param actionFilters - a {@link ActionFilters} object
47+
* @param indexNameExpressionResolver - {@link IndexNameExpressionResolver} object
3548
* @param rulePersistenceServiceRegistry - a {@link RulePersistenceServiceRegistry} object
3649
*/
3750
@Inject
3851
public TransportCreateRuleAction(
52+
ThreadPool threadPool,
3953
TransportService transportService,
54+
ClusterService clusterService,
4055
ActionFilters actionFilters,
56+
IndexNameExpressionResolver indexNameExpressionResolver,
4157
RulePersistenceServiceRegistry rulePersistenceServiceRegistry
4258
) {
43-
super(CreateRuleAction.NAME, transportService, actionFilters, CreateRuleRequest::new);
59+
super(
60+
CreateRuleAction.NAME,
61+
transportService,
62+
clusterService,
63+
threadPool,
64+
actionFilters,
65+
CreateRuleRequest::new,
66+
indexNameExpressionResolver
67+
);
4468
this.rulePersistenceServiceRegistry = rulePersistenceServiceRegistry;
4569
}
4670

4771
@Override
48-
protected void doExecute(Task task, CreateRuleRequest request, ActionListener<CreateRuleResponse> listener) {
72+
protected String executor() {
73+
return SAME;
74+
}
75+
76+
@Override
77+
protected CreateRuleResponse read(StreamInput in) throws IOException {
78+
return new CreateRuleResponse(in);
79+
}
80+
81+
@Override
82+
protected void clusterManagerOperation(CreateRuleRequest request, ClusterState state, ActionListener<CreateRuleResponse> listener)
83+
throws Exception {
4984
final RulePersistenceService rulePersistenceService = rulePersistenceServiceRegistry.getRulePersistenceService(
5085
request.getRule().getFeatureType()
5186
);
5287
rulePersistenceService.createRule(request, listener);
5388
}
89+
90+
@Override
91+
protected ClusterBlockException checkBlock(CreateRuleRequest request, ClusterState state) {
92+
return state.blocks().globalBlockedException(ClusterBlockLevel.WRITE);
93+
}
5494
}

modules/autotagging-commons/src/test/java/org/opensearch/rule/InMemoryRuleProcessingServiceTests.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -136,11 +136,6 @@ public String getName() {
136136
public Map<String, Attribute> getAllowedAttributesRegistry() {
137137
return Map.of("test_attribute", TestAttribute.TEST_ATTRIBUTE);
138138
}
139-
140-
// @Override
141-
// public FeatureValueValidator getFeatureValueValidator() {
142-
// return null;
143-
// }
144139
}
145140

146141
public enum TestAttribute implements Attribute {
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.rule.action;
10+
11+
import org.opensearch.core.common.io.stream.Writeable;
12+
import org.opensearch.test.OpenSearchTestCase;
13+
14+
public class CreateRuleActionTests extends OpenSearchTestCase {
15+
public void testGetName() {
16+
assertEquals("cluster:admin/opensearch/rule/_create", CreateRuleAction.NAME);
17+
}
18+
19+
public void testCreateResponseReader() {
20+
assertTrue(CreateRuleAction.INSTANCE.getResponseReader() instanceof Writeable.Reader);
21+
}
22+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.rule.action;
10+
11+
import org.opensearch.action.support.ActionFilters;
12+
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
13+
import org.opensearch.cluster.service.ClusterService;
14+
import org.opensearch.rule.CreateRuleRequest;
15+
import org.opensearch.rule.RulePersistenceService;
16+
import org.opensearch.rule.RulePersistenceServiceRegistry;
17+
import org.opensearch.rule.autotagging.Rule;
18+
import org.opensearch.test.OpenSearchTestCase;
19+
import org.opensearch.threadpool.ThreadPool;
20+
import org.opensearch.transport.TransportService;
21+
22+
import static org.mockito.ArgumentMatchers.any;
23+
import static org.mockito.Mockito.doNothing;
24+
import static org.mockito.Mockito.mock;
25+
import static org.mockito.Mockito.times;
26+
import static org.mockito.Mockito.verify;
27+
import static org.mockito.Mockito.when;
28+
29+
public class TransportCreateRuleActionTests extends OpenSearchTestCase {
30+
TransportCreateRuleAction sut;
31+
32+
public void testExecute() throws Exception {
33+
RulePersistenceServiceRegistry rulePersistenceServiceRegistry = mock(RulePersistenceServiceRegistry.class);
34+
TransportService transportService = mock(TransportService.class);
35+
ActionFilters actionFilters = mock(ActionFilters.class);
36+
RulePersistenceService rulePersistenceService = mock(RulePersistenceService.class);
37+
CreateRuleRequest createRuleRequest = mock(CreateRuleRequest.class);
38+
Rule rule = mock(Rule.class);
39+
when(createRuleRequest.getRule()).thenReturn(rule);
40+
when(createRuleRequest.getRule().getFeatureType()).thenReturn(null);
41+
ThreadPool threadPool = mock(ThreadPool.class);
42+
ClusterService clusterService = mock(ClusterService.class);
43+
IndexNameExpressionResolver indexNameExpressionResolver = mock(IndexNameExpressionResolver.class);
44+
45+
when(rulePersistenceServiceRegistry.getRulePersistenceService(any())).thenReturn(rulePersistenceService);
46+
doNothing().when(rulePersistenceService).getRule(any(), any());
47+
sut = new TransportCreateRuleAction(
48+
threadPool,
49+
transportService,
50+
clusterService,
51+
actionFilters,
52+
indexNameExpressionResolver,
53+
rulePersistenceServiceRegistry
54+
);
55+
sut.clusterManagerOperation(createRuleRequest, null, null);
56+
verify(rulePersistenceService, times(1)).createRule(any(), any());
57+
}
58+
}

plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/WorkloadManagementPlugin.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,6 @@
4949
import org.opensearch.rule.InMemoryRuleProcessingService;
5050
import org.opensearch.rule.RulePersistenceService;
5151
import org.opensearch.rule.autotagging.FeatureType;
52-
import org.opensearch.rule.autotagging.FeatureValueValidator;
5352
import org.opensearch.rule.service.IndexStoredRulePersistenceService;
5453
import org.opensearch.rule.spi.RuleFrameworkExtension;
5554
import org.opensearch.rule.storage.DefaultAttributeValueStore;
@@ -102,14 +101,13 @@ public Collection<Object> createComponents(
102101
IndexNameExpressionResolver indexNameExpressionResolver,
103102
Supplier<RepositoriesService> repositoriesServiceSupplier
104103
) {
105-
FeatureValueValidator validator = WorkloadGroupFeatureValueValidator.getInstance(clusterService);
106-
WorkloadGroupFeatureType.initializeFeatureValueValidator(validator);
104+
FeatureTypeHolder.featureType = new WorkloadGroupFeatureType(new WorkloadGroupFeatureValueValidator(clusterService));
107105
RulePersistenceServiceHolder.rulePersistenceService = new IndexStoredRulePersistenceService(
108106
INDEX_NAME,
109107
client,
110108
clusterService,
111109
MAX_RULES_PER_PAGE,
112-
new XContentRuleParser(WorkloadGroupFeatureType.getInstance()),
110+
new XContentRuleParser(FeatureTypeHolder.featureType),
113111
new IndexBasedRuleQueryMapper()
114112
);
115113
InMemoryRuleProcessingService ruleProcessingService = new InMemoryRuleProcessingService(
@@ -175,10 +173,14 @@ public Supplier<RulePersistenceService> getRulePersistenceServiceSupplier() {
175173

176174
@Override
177175
public FeatureType getFeatureType() {
178-
return WorkloadGroupFeatureType.getInstance();
176+
return FeatureTypeHolder.featureType;
179177
}
180178

181179
static class RulePersistenceServiceHolder {
182180
private static RulePersistenceService rulePersistenceService;
183181
}
182+
183+
static class FeatureTypeHolder {
184+
private static FeatureType featureType;
185+
}
184186
}

plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/rule/WorkloadGroupFeatureType.java

Lines changed: 1 addition & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -31,37 +31,15 @@ public class WorkloadGroupFeatureType implements FeatureType {
3131
RuleAttribute.INDEX_PATTERN
3232
);
3333
private final FeatureValueValidator featureValueValidator;
34-
private static WorkloadGroupFeatureType instance;
3534

3635
/**
3736
* constructor for WorkloadGroupFeatureType
3837
* @param featureValueValidator
3938
*/
40-
private WorkloadGroupFeatureType(FeatureValueValidator featureValueValidator) {
39+
public WorkloadGroupFeatureType(FeatureValueValidator featureValueValidator) {
4140
this.featureValueValidator = featureValueValidator;
4241
}
4342

44-
/**
45-
* Initializes the singleton instance of WorkloadGroupFeatureType.
46-
* This method should be called once before calling {@link #getInstance()}.
47-
* @param validator the FeatureValueValidator to be used for initialization
48-
*/
49-
public static void initializeFeatureValueValidator(FeatureValueValidator validator) {
50-
if (instance == null) {
51-
instance = new WorkloadGroupFeatureType(validator);
52-
}
53-
}
54-
55-
/**
56-
* Returns the singleton instance of {@link WorkloadGroupFeatureType}.
57-
*/
58-
public static WorkloadGroupFeatureType getInstance() {
59-
if (instance == null) {
60-
throw new IllegalStateException("FeatureValueValidator is not initialized. Call initializeFeatureValueValidator() first.");
61-
}
62-
return instance;
63-
}
64-
6543
@Override
6644
public String getName() {
6745
return NAME;

plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/rule/WorkloadGroupFeatureValueValidator.java

Lines changed: 1 addition & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -20,33 +20,16 @@
2020
*/
2121
public class WorkloadGroupFeatureValueValidator implements FeatureValueValidator {
2222
private final ClusterService clusterService;
23-
private static volatile WorkloadGroupFeatureValueValidator instance;
2423
private final Logger logger = LogManager.getLogger(WorkloadGroupFeatureValueValidator.class);
2524

2625
/**
2726
* constructor for WorkloadGroupFeatureValueValidator
2827
* @param clusterService
2928
*/
30-
private WorkloadGroupFeatureValueValidator(ClusterService clusterService) {
29+
public WorkloadGroupFeatureValueValidator(ClusterService clusterService) {
3130
this.clusterService = clusterService;
3231
}
3332

34-
/**
35-
* Returns the singleton instance of {@code WorkloadGroupFeatureValueValidator}, initializing it if necessary.
36-
* Uses double-checked locking to ensure thread-safe lazy initialization.
37-
* @param clusterService the {@link ClusterService} used to construct the validator if not already initialized
38-
*/
39-
public static WorkloadGroupFeatureValueValidator getInstance(ClusterService clusterService) {
40-
if (instance == null) {
41-
synchronized (WorkloadGroupFeatureValueValidator.class) {
42-
if (instance == null) {
43-
instance = new WorkloadGroupFeatureValueValidator(clusterService);
44-
}
45-
}
46-
}
47-
return instance;
48-
}
49-
5033
@Override
5134
public void validate(String featureValue) {
5235
if (!clusterService.state().metadata().workloadGroups().containsKey(featureValue)) {

plugins/workload-management/src/test/java/org/opensearch/plugin/wlm/rule/WorkloadGroupFeatureTypeTests.java

Lines changed: 1 addition & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -8,27 +8,15 @@
88

99
package org.opensearch.plugin.wlm.rule;
1010

11-
import org.opensearch.cluster.service.ClusterService;
1211
import org.opensearch.rule.RuleAttribute;
1312
import org.opensearch.rule.autotagging.Attribute;
1413
import org.opensearch.rule.autotagging.AutoTaggingRegistry;
15-
import org.opensearch.rule.autotagging.FeatureValueValidator;
1614
import org.opensearch.test.OpenSearchTestCase;
17-
import org.junit.Before;
1815

1916
import java.util.Map;
2017

21-
import static org.mockito.Mockito.mock;
22-
2318
public class WorkloadGroupFeatureTypeTests extends OpenSearchTestCase {
24-
WorkloadGroupFeatureType featureType;
25-
26-
@Before
27-
public void setUpFeatureType() {
28-
FeatureValueValidator validator = WorkloadGroupFeatureValueValidator.getInstance(mock(ClusterService.class));
29-
WorkloadGroupFeatureType.initializeFeatureValueValidator(validator);
30-
featureType = WorkloadGroupFeatureType.getInstance();
31-
}
19+
WorkloadGroupFeatureType featureType = new WorkloadGroupFeatureType(new WorkloadGroupFeatureValueValidator(null));
3220

3321
public void testGetName_returnsCorrectName() {
3422
assertEquals("workload_group", featureType.getName());

0 commit comments

Comments
 (0)