Skip to content

Commit cfde69c

Browse files
committed
customize transport layer to route to primary shard's node
Signed-off-by: Ruirui Zhang <[email protected]>
1 parent c2fcf39 commit cfde69c

File tree

12 files changed

+401
-172
lines changed

12 files changed

+401
-172
lines changed

modules/autotagging-commons/common/src/main/java/org/opensearch/rule/CreateRuleRequest.java

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,8 @@
88

99
package org.opensearch.rule;
1010

11+
import org.opensearch.action.ActionRequest;
1112
import org.opensearch.action.ActionRequestValidationException;
12-
import org.opensearch.action.support.clustermanager.ClusterManagerNodeRequest;
1313
import org.opensearch.core.common.io.stream.StreamInput;
1414
import org.opensearch.core.common.io.stream.StreamOutput;
1515
import org.opensearch.rule.autotagging.Rule;
@@ -19,16 +19,15 @@
1919
/**
2020
* A request for create Rule
2121
* Example request:
22-
* Note that the endpoint below is for wlm rules specifically and serves only as an example
23-
* curl -XPUT "localhost:9200/_wlm/rule/" -H 'Content-Type: application/json' -d '
22+
* curl -X PUT "localhost:9200/_rules/{featureType}/" -H 'Content-Type: application/json' -d '
2423
* {
2524
* "description": "description1",
26-
* "index_pattern": ["log*", "event*"],
27-
* "workload_group": "poOiU851RwyLYvV5lbvv5w"
25+
* "attribute_name": ["log*", "event*"],
26+
* "feature_type": "poOiU851RwyLYvV5lbvv5w"
2827
* }'
2928
* @opensearch.experimental
3029
*/
31-
public class CreateRuleRequest extends ClusterManagerNodeRequest<CreateRuleRequest> {
30+
public class CreateRuleRequest extends ActionRequest {
3231
private final Rule rule;
3332

3433
/**

modules/autotagging-commons/common/src/main/java/org/opensearch/rule/GetRuleRequest.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,10 +26,9 @@
2626
/**
2727
* A request for get Rule
2828
* Example Request:
29-
* The endpoint "localhost:9200/_wlm/rule" is specific to the Workload Management feature to manage rules
30-
* curl -X GET "localhost:9200/_wlm/rule" - get all rules
31-
* curl -X GET "localhost:9200/_wlm/rule/{_id}" - get single rule by id
32-
* curl -X GET "localhost:9200/_wlm/rule?index_pattern=a,b" - get all rules containing attribute index_pattern as a or b
29+
* curl -X GET "localhost:9200/_rules/{featureType}/" - get all rules for {featureType}
30+
* curl -X GET "localhost:9200/_rules/{featureType}/{_id}" - get single rule by id
31+
* curl -X GET "localhost:9200/_rules/{featureType}?index_pattern=a,b" - get all rules containing attribute index_pattern as a or b for {featureType}
3332
* @opensearch.experimental
3433
*/
3534
@ExperimentalApi

modules/autotagging-commons/common/src/main/java/org/opensearch/rule/service/IndexStoredRulePersistenceService.java

Lines changed: 18 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -10,29 +10,22 @@
1010

1111
import org.apache.logging.log4j.LogManager;
1212
import org.apache.logging.log4j.Logger;
13-
import org.opensearch.ExceptionsHelper;
14-
import org.opensearch.ResourceAlreadyExistsException;
1513
import org.opensearch.ResourceNotFoundException;
1614
import org.opensearch.action.DocWriteResponse;
1715
import org.opensearch.action.delete.DeleteRequest;
18-
import org.opensearch.action.search.SearchRequestBuilder;
19-
import org.opensearch.action.support.clustermanager.AcknowledgedResponse;
20-
import org.opensearch.action.admin.indices.create.CreateIndexRequest;
21-
import org.opensearch.action.admin.indices.create.CreateIndexResponse;
2216
import org.opensearch.action.index.IndexRequest;
2317
import org.opensearch.action.search.SearchRequestBuilder;
18+
import org.opensearch.action.support.clustermanager.AcknowledgedResponse;
2419
import org.opensearch.cluster.service.ClusterService;
2520
import org.opensearch.common.util.concurrent.ThreadContext;
2621
import org.opensearch.common.xcontent.XContentFactory;
2722
import org.opensearch.core.action.ActionListener;
28-
import org.opensearch.index.engine.DocumentMissingException;
29-
import org.opensearch.index.query.QueryBuilder;
30-
import org.opensearch.index.query.QueryBuilders;
31-
import org.opensearch.rule.DeleteRuleRequest;
3223
import org.opensearch.core.xcontent.ToXContent;
24+
import org.opensearch.index.engine.DocumentMissingException;
3325
import org.opensearch.index.query.QueryBuilder;
3426
import org.opensearch.rule.CreateRuleRequest;
3527
import org.opensearch.rule.CreateRuleResponse;
28+
import org.opensearch.rule.DeleteRuleRequest;
3629
import org.opensearch.rule.GetRuleRequest;
3730
import org.opensearch.rule.GetRuleResponse;
3831
import org.opensearch.rule.RuleEntityParser;
@@ -67,9 +60,7 @@ public class IndexStoredRulePersistenceService implements RulePersistenceService
6760
private final int maxRulesPerPage;
6861
private final RuleEntityParser parser;
6962
private final RuleQueryMapper<QueryBuilder> queryBuilder;
70-
private final Object lock = new Object();
7163
private static final Logger logger = LogManager.getLogger(IndexStoredRulePersistenceService.class);
72-
private static final Map<String, Object> indexSettings = Map.of("index.number_of_shards", 1, "index.auto_expand_replicas", "0-all");
7364

7465
/**
7566
* Constructs an instance of {@link IndexStoredRulePersistenceService} with the specified parameters.
@@ -106,66 +97,21 @@ public IndexStoredRulePersistenceService(
10697
public void createRule(CreateRuleRequest request, ActionListener<CreateRuleResponse> listener) {
10798
try (ThreadContext.StoredContext ctx = getContext()) {
10899
if (!clusterService.state().metadata().hasIndex(indexName)) {
109-
createIndex(listener, request.getRule());
100+
logger.error("Index {} does not exist", indexName);
101+
throw new IllegalStateException("Index" + indexName + " does not exist");
110102
} else {
111-
synchronized (lock) {
112-
validateAndPersist(request.getRule(), listener);
113-
}
103+
Rule rule = request.getRule();
104+
validateNoDuplicateRule(rule, ActionListener.wrap(unused -> persistRule(rule, listener), listener::onFailure));
114105
}
115106
}
116107
}
117108

118-
/**
119-
* Creates the system index, then validates and persists the given rule.
120-
* @param listener - ActionListener for CreateRuleResponse
121-
* @param rule - the rule to validate and persist
122-
*/
123-
private void createIndex(ActionListener<CreateRuleResponse> listener, Rule rule) {
124-
final CreateIndexRequest request = new CreateIndexRequest(indexName).settings(indexSettings);
125-
client.admin().indices().create(request, new ActionListener<>() {
126-
@Override
127-
public void onResponse(CreateIndexResponse response) {
128-
if (!response.isAcknowledged()) {
129-
logger.error("Index creation not acknowledged: {}", indexName);
130-
listener.onFailure(new IllegalStateException(indexName + " index creation failed and rule cannot be persisted"));
131-
} else {
132-
synchronized (lock) {
133-
validateAndPersist(rule, listener);
134-
}
135-
}
136-
}
137-
138-
@Override
139-
public void onFailure(Exception e) {
140-
Throwable cause = ExceptionsHelper.unwrapCause(e);
141-
if (cause instanceof ResourceAlreadyExistsException) {
142-
synchronized (lock) {
143-
validateAndPersist(rule, listener);
144-
}
145-
} else {
146-
logger.error("Failed to create index {}: {}", indexName, e.getMessage());
147-
listener.onFailure(e);
148-
}
149-
}
150-
});
151-
}
152-
153-
/**
154-
* Validates that the rule does not already exist, then persists it if validation succeeds
155-
* @param rule - The rule to validate and persist
156-
* @param listener - ActionListener for CreateRuleResponse
157-
*/
158-
private void validateAndPersist(Rule rule, ActionListener<CreateRuleResponse> listener) {
159-
validateNoDuplicateRule(rule, ActionListener.wrap(unused -> persistRule(rule, listener), listener::onFailure));
160-
}
161-
162109
/**
163110
* Validates that no duplicate rule exists with the same attribute map.
164111
* If a conflict is found, fails the listener
165112
* @param rule - the rule we check duplicate against
166113
* @param listener - listener for validateNoDuplicateRule response
167114
*/
168-
169115
private void validateNoDuplicateRule(Rule rule, ActionListener<Void> listener) {
170116
try (ThreadContext.StoredContext ctx = getContext()) {
171117
QueryBuilder query = queryBuilder.from(new GetRuleRequest(null, rule.getAttributeMap(), null, rule.getFeatureType()));
@@ -266,10 +212,6 @@ void handleGetRuleResponse(List<SearchHit> hits, ActionListener<GetRuleResponse>
266212
listener.onResponse(new GetRuleResponse(ruleMap, nextSearchAfter));
267213
}
268214

269-
private ThreadContext.StoredContext getContext() {
270-
return client.threadPool().getThreadContext().stashContext();
271-
}
272-
273215
@Override
274216
public void deleteRule(DeleteRuleRequest request, ActionListener<AcknowledgedResponse> listener) {
275217
try (ThreadContext.StoredContext context = getContext()) {
@@ -291,4 +233,15 @@ public void deleteRule(DeleteRuleRequest request, ActionListener<AcknowledgedRes
291233
}));
292234
}
293235
}
236+
237+
/**
238+
* indexName getter
239+
*/
240+
public String getIndexName() {
241+
return indexName;
242+
}
243+
244+
private ThreadContext.StoredContext getContext() {
245+
return client.threadPool().getThreadContext().stashContext();
246+
}
294247
}

modules/autotagging-commons/common/src/test/java/org/opensearch/rule/service/IndexStoredRulePersistenceServiceTests.java

Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@
1212
import org.opensearch.ResourceNotFoundException;
1313
import org.opensearch.action.DocWriteResponse;
1414
import org.opensearch.action.delete.DeleteRequest;
15+
import org.opensearch.action.index.IndexRequest;
16+
import org.opensearch.action.index.IndexResponse;
1517
import org.opensearch.action.search.SearchRequestBuilder;
1618
import org.opensearch.action.search.SearchResponse;
1719
import org.opensearch.action.support.clustermanager.AcknowledgedResponse;
@@ -25,6 +27,8 @@
2527
import org.opensearch.core.index.shard.ShardId;
2628
import org.opensearch.index.engine.DocumentMissingException;
2729
import org.opensearch.index.query.QueryBuilder;
30+
import org.opensearch.rule.CreateRuleRequest;
31+
import org.opensearch.rule.CreateRuleResponse;
2832
import org.opensearch.rule.DeleteRuleRequest;
2933
import org.opensearch.rule.GetRuleRequest;
3034
import org.opensearch.rule.GetRuleResponse;
@@ -39,11 +43,16 @@
3943
import org.opensearch.threadpool.ThreadPool;
4044
import org.opensearch.transport.client.Client;
4145

46+
import java.io.IOException;
4247
import java.util.HashMap;
48+
import java.util.Map;
49+
import java.util.Set;
4350

4451
import org.mockito.ArgumentCaptor;
4552

4653
import static org.opensearch.rule.XContentRuleParserTests.VALID_JSON;
54+
import static org.opensearch.rule.utils.RuleTestUtils.ATTRIBUTE_VALUE_ONE;
55+
import static org.opensearch.rule.utils.RuleTestUtils.MockRuleAttributes.MOCK_RULE_ATTRIBUTE_ONE;
4756
import static org.opensearch.rule.utils.RuleTestUtils.TEST_INDEX_NAME;
4857
import static org.opensearch.rule.utils.RuleTestUtils._ID_ONE;
4958
import static org.mockito.ArgumentMatchers.any;
@@ -61,6 +70,111 @@ public class IndexStoredRulePersistenceServiceTests extends OpenSearchTestCase {
6170

6271
public static final int MAX_VALUES_PER_PAGE = 50;
6372

73+
public void testCreateRuleOnExistingIndex() throws IOException {
74+
CreateRuleRequest createRuleRequest = mock(CreateRuleRequest.class);
75+
Rule mockRule = mock(Rule.class);
76+
when(createRuleRequest.getRule()).thenReturn(mockRule);
77+
RuleQueryMapper<QueryBuilder> mockRuleQueryMapper = mock(RuleQueryMapper.class);
78+
RuleEntityParser mockRuleEntityParser = mock(RuleEntityParser.class);
79+
ClusterService clusterService = mock(ClusterService.class);
80+
ClusterState clusterState = mock(ClusterState.class);
81+
Metadata metadata = mock(Metadata.class);
82+
QueryBuilder queryBuilder = mock(QueryBuilder.class);
83+
84+
when(clusterService.state()).thenReturn(clusterState);
85+
when(clusterState.metadata()).thenReturn(metadata);
86+
when(metadata.hasIndex(TEST_INDEX_NAME)).thenReturn(true);
87+
when(mockRuleQueryMapper.from(any(GetRuleRequest.class))).thenReturn(queryBuilder);
88+
when(queryBuilder.filter(any())).thenReturn(queryBuilder);
89+
90+
SearchRequestBuilder searchRequestBuilder = mock(SearchRequestBuilder.class);
91+
Client client = setUpMockClient(searchRequestBuilder);
92+
RulePersistenceService rulePersistenceService = new IndexStoredRulePersistenceService(
93+
TEST_INDEX_NAME,
94+
client,
95+
clusterService,
96+
MAX_VALUES_PER_PAGE,
97+
mockRuleEntityParser,
98+
mockRuleQueryMapper
99+
);
100+
ActionListener<CreateRuleResponse> listener = mock(ActionListener.class);
101+
when(mockRule.toXContent(any(), any())).thenAnswer(invocation -> invocation.getArgument(0));
102+
103+
SearchResponse searchResponse = mock(SearchResponse.class);
104+
when(searchResponse.getHits()).thenReturn(new SearchHits(new SearchHit[] {}, new TotalHits(0, TotalHits.Relation.EQUAL_TO), 1.0f));
105+
doAnswer((invocation) -> {
106+
ActionListener<SearchResponse> actionListener = invocation.getArgument(0);
107+
actionListener.onResponse(searchResponse);
108+
return null;
109+
}).when(searchRequestBuilder).execute(any(ActionListener.class));
110+
111+
IndexResponse indexResponse = mock(IndexResponse.class);
112+
when(indexResponse.getId()).thenReturn(_ID_ONE);
113+
doAnswer(invocation -> {
114+
ActionListener<IndexResponse> actionListener = invocation.getArgument(1);
115+
actionListener.onResponse(indexResponse);
116+
return null;
117+
}).when(client).index(any(IndexRequest.class), any(ActionListener.class));
118+
119+
rulePersistenceService.createRule(createRuleRequest, listener);
120+
ArgumentCaptor<CreateRuleResponse> responseCaptor = ArgumentCaptor.forClass(CreateRuleResponse.class);
121+
verify(listener).onResponse(responseCaptor.capture());
122+
CreateRuleResponse response = responseCaptor.getValue();
123+
assertNotNull(response.getRule());
124+
}
125+
126+
public void testCreateDuplicateRule() {
127+
CreateRuleRequest createRuleRequest = mock(CreateRuleRequest.class);
128+
Rule mockRule = mock(Rule.class);
129+
when(createRuleRequest.getRule()).thenReturn(mockRule);
130+
131+
RuleQueryMapper<QueryBuilder> mockRuleQueryMapper = mock(RuleQueryMapper.class);
132+
RuleEntityParser mockRuleEntityParser = mock(RuleEntityParser.class);
133+
ClusterService clusterService = mock(ClusterService.class);
134+
ClusterState clusterState = mock(ClusterState.class);
135+
Metadata metadata = mock(Metadata.class);
136+
QueryBuilder queryBuilder = mock(QueryBuilder.class);
137+
138+
when(clusterService.state()).thenReturn(clusterState);
139+
when(clusterState.metadata()).thenReturn(metadata);
140+
when(metadata.hasIndex(TEST_INDEX_NAME)).thenReturn(true);
141+
when(mockRuleQueryMapper.from(any(GetRuleRequest.class))).thenReturn(queryBuilder);
142+
when(queryBuilder.filter(any())).thenReturn(queryBuilder);
143+
144+
when(mockRule.getAttributeMap()).thenReturn(Map.of(MOCK_RULE_ATTRIBUTE_ONE, Set.of(ATTRIBUTE_VALUE_ONE)));
145+
when(mockRule.getFeatureType()).thenReturn(RuleTestUtils.MockRuleFeatureType.INSTANCE);
146+
147+
SearchRequestBuilder searchRequestBuilder = mock(SearchRequestBuilder.class);
148+
Client client = setUpMockClient(searchRequestBuilder);
149+
150+
RulePersistenceService rulePersistenceService = new IndexStoredRulePersistenceService(
151+
TEST_INDEX_NAME,
152+
client,
153+
clusterService,
154+
MAX_VALUES_PER_PAGE,
155+
mockRuleEntityParser,
156+
mockRuleQueryMapper
157+
);
158+
159+
SearchResponse searchResponse = mock(SearchResponse.class);
160+
SearchHit hit = new SearchHit(1);
161+
hit.sourceRef(new BytesArray(VALID_JSON));
162+
SearchHits searchHits = new SearchHits(new SearchHit[] { hit }, new TotalHits(1, TotalHits.Relation.EQUAL_TO), 1.0f);
163+
when(searchResponse.getHits()).thenReturn(searchHits);
164+
165+
doAnswer((invocation) -> {
166+
ActionListener<SearchResponse> actionListener = invocation.getArgument(0);
167+
actionListener.onResponse(searchResponse);
168+
return null;
169+
}).when(searchRequestBuilder).execute(any(ActionListener.class));
170+
171+
ActionListener<CreateRuleResponse> listener = mock(ActionListener.class);
172+
when(mockRuleEntityParser.parse(any(String.class))).thenReturn(mockRule);
173+
rulePersistenceService.createRule(createRuleRequest, listener);
174+
ArgumentCaptor<Exception> failureCaptor = ArgumentCaptor.forClass(Exception.class);
175+
verify(listener).onFailure(failureCaptor.capture());
176+
}
177+
64178
public void testGetRuleByIdSuccess() {
65179
GetRuleRequest getRuleRequest = mock(GetRuleRequest.class);
66180
when(getRuleRequest.getId()).thenReturn(_ID_ONE);
@@ -76,10 +190,12 @@ public void testGetRuleByIdSuccess() {
76190

77191
SearchRequestBuilder searchRequestBuilder = mock(SearchRequestBuilder.class);
78192
Client client = setUpMockClient(searchRequestBuilder);
193+
ClusterService clusterService = mock(ClusterService.class);
79194

80195
RulePersistenceService rulePersistenceService = new IndexStoredRulePersistenceService(
81196
TEST_INDEX_NAME,
82197
client,
198+
clusterService,
83199
MAX_VALUES_PER_PAGE,
84200
mockRuleEntityParser,
85201
mockRuleQueryMapper
@@ -122,10 +238,12 @@ public void testGetRuleByIdNotFound() {
122238

123239
SearchRequestBuilder searchRequestBuilder = mock(SearchRequestBuilder.class);
124240
Client client = setUpMockClient(searchRequestBuilder);
241+
ClusterService clusterService = mock(ClusterService.class);
125242

126243
RulePersistenceService rulePersistenceService = new IndexStoredRulePersistenceService(
127244
TEST_INDEX_NAME,
128245
client,
246+
clusterService,
129247
MAX_VALUES_PER_PAGE,
130248
mockRuleEntityParser,
131249
mockRuleQueryMapper
@@ -176,13 +294,15 @@ public void testDeleteRule_successful() {
176294
DeleteRuleRequest request = new DeleteRuleRequest(ruleId, RuleTestUtils.MockRuleFeatureType.INSTANCE);
177295

178296
Client client = mock(Client.class);
297+
ClusterService clusterService = mock(ClusterService.class);
179298
ThreadPool threadPool = mock(ThreadPool.class);
180299
when(client.threadPool()).thenReturn(threadPool);
181300
when(threadPool.getThreadContext()).thenReturn(new ThreadContext(Settings.EMPTY));
182301

183302
RulePersistenceService rulePersistenceService = new IndexStoredRulePersistenceService(
184303
TEST_INDEX_NAME,
185304
client,
305+
clusterService,
186306
MAX_VALUES_PER_PAGE,
187307
mock(RuleEntityParser.class),
188308
mock(RuleQueryMapper.class)
@@ -214,13 +334,15 @@ public void testDeleteRule_notFound() {
214334
DeleteRuleRequest request = new DeleteRuleRequest(ruleId, RuleTestUtils.MockRuleFeatureType.INSTANCE);
215335

216336
Client client = mock(Client.class);
337+
ClusterService clusterService = mock(ClusterService.class);
217338
ThreadPool threadPool = mock(ThreadPool.class);
218339
when(client.threadPool()).thenReturn(threadPool);
219340
when(threadPool.getThreadContext()).thenReturn(new ThreadContext(Settings.EMPTY));
220341

221342
RulePersistenceService rulePersistenceService = new IndexStoredRulePersistenceService(
222343
TEST_INDEX_NAME,
223344
client,
345+
clusterService,
224346
MAX_VALUES_PER_PAGE,
225347
mock(RuleEntityParser.class),
226348
mock(RuleQueryMapper.class)

0 commit comments

Comments
 (0)