14
14
import org .opensearch .action .DocWriteResponse ;
15
15
import org .opensearch .action .delete .DeleteRequest ;
16
16
import org .opensearch .action .index .IndexRequest ;
17
+ import org .opensearch .action .index .IndexResponse ;
17
18
import org .opensearch .action .search .SearchRequestBuilder ;
19
+ import org .opensearch .action .search .SearchResponse ;
18
20
import org .opensearch .action .support .clustermanager .AcknowledgedResponse ;
19
21
import org .opensearch .cluster .service .ClusterService ;
20
22
import org .opensearch .common .util .concurrent .ThreadContext ;
37
39
import org .opensearch .search .sort .SortOrder ;
38
40
import org .opensearch .transport .client .Client ;
39
41
40
- import java .io .IOException ;
41
42
import java .util .Arrays ;
42
43
import java .util .List ;
43
44
import java .util .Map ;
@@ -95,10 +96,10 @@ public IndexStoredRulePersistenceService(
95
96
* @param listener ActionListener for CreateRuleResponse
96
97
*/
97
98
public void createRule (CreateRuleRequest request , ActionListener <CreateRuleResponse > listener ) {
98
- try (ThreadContext .StoredContext ctx = getContext ()) {
99
+ try (ThreadContext .StoredContext ctx = stashContext ()) {
99
100
if (!clusterService .state ().metadata ().hasIndex (indexName )) {
100
101
logger .error ("Index {} does not exist" , indexName );
101
- throw new IllegalStateException ("Index" + indexName + " does not exist" );
102
+ listener . onFailure ( new IllegalStateException ("Index" + indexName + " does not exist" ) );
102
103
} else {
103
104
Rule rule = request .getRule ();
104
105
validateNoDuplicateRule (rule , ActionListener .wrap (unused -> persistRule (rule , listener ), listener ::onFailure ));
@@ -107,30 +108,28 @@ public void createRule(CreateRuleRequest request, ActionListener<CreateRuleRespo
107
108
}
108
109
109
110
/**
110
- * Validates that no duplicate rule exists with the same attribute map.
111
- * If a conflict is found, fails the listener
111
+ * Validates that no existing rule has the same attribute map as the given rule .
112
+ * This validation must be performed one at a time to prevent writing duplicate rules.
112
113
* @param rule - the rule we check duplicate against
113
114
* @param listener - listener for validateNoDuplicateRule response
114
115
*/
115
116
private void validateNoDuplicateRule (Rule rule , ActionListener <Void > listener ) {
116
- try (ThreadContext .StoredContext ctx = getContext ()) {
117
- QueryBuilder query = queryBuilder .from (new GetRuleRequest (null , rule .getAttributeMap (), null , rule .getFeatureType ()));
118
- getRuleFromIndex (null , query , null , new ActionListener <>() {
119
- @ Override
120
- public void onResponse (GetRuleResponse getRuleResponse ) {
121
- Optional <String > duplicateRuleId = RuleUtils .getDuplicateRuleId (rule , getRuleResponse .getRules ());
122
- duplicateRuleId .ifPresentOrElse (
123
- id -> listener .onFailure (new IllegalArgumentException ("Duplicate rule exists under id " + id )),
124
- () -> listener .onResponse (null )
125
- );
126
- }
117
+ QueryBuilder query = queryBuilder .from (new GetRuleRequest (null , rule .getAttributeMap (), null , rule .getFeatureType ()));
118
+ getRuleFromIndex (null , query , null , new ActionListener <>() {
119
+ @ Override
120
+ public void onResponse (GetRuleResponse getRuleResponse ) {
121
+ Optional <String > duplicateRuleId = RuleUtils .getDuplicateRuleId (rule , getRuleResponse .getRules ());
122
+ duplicateRuleId .ifPresentOrElse (
123
+ id -> listener .onFailure (new IllegalArgumentException ("Duplicate rule exists under id " + id )),
124
+ () -> listener .onResponse (null )
125
+ );
126
+ }
127
127
128
- @ Override
129
- public void onFailure (Exception e ) {
130
- listener .onFailure (e );
131
- }
132
- });
133
- }
128
+ @ Override
129
+ public void onFailure (Exception e ) {
130
+ listener .onFailure (e );
131
+ }
132
+ });
134
133
}
135
134
136
135
/**
@@ -139,17 +138,13 @@ public void onFailure(Exception e) {
139
138
* @param listener - ActionListener for CreateRuleResponse
140
139
*/
141
140
private void persistRule (Rule rule , ActionListener <CreateRuleResponse > listener ) {
142
- try ( ThreadContext . StoredContext ctx = getContext ()) {
141
+ try {
143
142
IndexRequest indexRequest = new IndexRequest (indexName ).source (
144
143
rule .toXContent (XContentFactory .jsonBuilder (), ToXContent .EMPTY_PARAMS )
145
144
);
146
- client .index (indexRequest , ActionListener .wrap (indexResponse -> {
147
- listener .onResponse (new CreateRuleResponse (indexResponse .getId (), rule ));
148
- }, e -> {
149
- logger .warn ("Failed to save Rule object due to error: {}" , e .getMessage ());
150
- listener .onFailure (e );
151
- }));
152
- } catch (IOException e ) {
145
+ IndexResponse indexResponse = client .index (indexRequest ).get ();
146
+ listener .onResponse (new CreateRuleResponse (indexResponse .getId (), rule ));
147
+ } catch (Exception e ) {
153
148
logger .error ("Error saving rule to index: {}" , indexName );
154
149
listener .onFailure (new RuntimeException ("Failed to save rule to index." ));
155
150
}
@@ -161,8 +156,10 @@ private void persistRule(Rule rule, ActionListener<CreateRuleResponse> listener)
161
156
* @param listener the listener for GetRuleResponse.
162
157
*/
163
158
public void getRule (GetRuleRequest getRuleRequest , ActionListener <GetRuleResponse > listener ) {
164
- final QueryBuilder getQueryBuilder = queryBuilder .from (getRuleRequest );
165
- getRuleFromIndex (getRuleRequest .getId (), getQueryBuilder , getRuleRequest .getSearchAfter (), listener );
159
+ try (ThreadContext .StoredContext context = stashContext ()) {
160
+ final QueryBuilder getQueryBuilder = queryBuilder .from (getRuleRequest );
161
+ getRuleFromIndex (getRuleRequest .getId (), getQueryBuilder , getRuleRequest .getSearchAfter (), listener );
162
+ }
166
163
}
167
164
168
165
/**
@@ -173,22 +170,19 @@ public void getRule(GetRuleRequest getRuleRequest, ActionListener<GetRuleRespons
173
170
* @param listener - ActionListener for GetRuleResponse
174
171
*/
175
172
private void getRuleFromIndex (String id , QueryBuilder queryBuilder , String searchAfter , ActionListener <GetRuleResponse > listener ) {
176
- // Stash the current thread context when interacting with system index to perform
177
- // operations as the system itself, bypassing authorization checks. This ensures that
178
- // actions within this block are trusted and executed with system-level privileges.
179
- try (ThreadContext .StoredContext context = getContext ()) {
173
+ try {
180
174
SearchRequestBuilder searchRequest = client .prepareSearch (indexName ).setQuery (queryBuilder ).setSize (maxRulesPerPage );
181
175
if (searchAfter != null ) {
182
176
searchRequest .addSort (_ID_STRING , SortOrder .ASC ).searchAfter (new Object [] { searchAfter });
183
177
}
184
- searchRequest . execute ( ActionListener . wrap ( searchResponse -> {
185
- List < SearchHit > hits = Arrays . asList ( searchResponse . getHits (). getHits () );
186
- if ( hasNoResults ( id , listener , hits )) return ;
187
- handleGetRuleResponse ( hits , listener ) ;
188
- }, e -> {
189
- logger . error ( "Failed to fetch all rules: {}" , e . getMessage ());
190
- listener . onFailure ( e );
191
- }) );
178
+
179
+ SearchResponse searchResponse = searchRequest . get ( );
180
+ List < SearchHit > hits = Arrays . asList ( searchResponse . getHits (). getHits ()) ;
181
+ if ( hasNoResults ( id , listener , hits )) return ;
182
+ handleGetRuleResponse ( hits , listener );
183
+ } catch ( Exception e ) {
184
+ logger . error ( "Failed to fetch all rules: {}" , e . getMessage () );
185
+ listener . onFailure ( e );
192
186
}
193
187
}
194
188
@@ -214,7 +208,7 @@ void handleGetRuleResponse(List<SearchHit> hits, ActionListener<GetRuleResponse>
214
208
215
209
@ Override
216
210
public void deleteRule (DeleteRuleRequest request , ActionListener <AcknowledgedResponse > listener ) {
217
- try (ThreadContext .StoredContext context = getContext ()) {
211
+ try (ThreadContext .StoredContext context = stashContext ()) {
218
212
DeleteRequest deleteRequest = new DeleteRequest (indexName ).id (request .getRuleId ());
219
213
client .delete (deleteRequest , ActionListener .wrap (deleteResponse -> {
220
214
boolean acknowledged = deleteResponse .getResult () == DocWriteResponse .Result .DELETED ;
@@ -241,7 +235,7 @@ public String getIndexName() {
241
235
return indexName ;
242
236
}
243
237
244
- private ThreadContext .StoredContext getContext () {
238
+ private ThreadContext .StoredContext stashContext () {
245
239
return client .threadPool ().getThreadContext ().stashContext ();
246
240
}
247
241
}
0 commit comments