Skip to content

Commit 9f43a02

Browse files
committed
add correlation rule layer for events-correlation-engine
Signed-off-by: Subhobrata Dey <[email protected]>
1 parent bd9b00d commit 9f43a02

File tree

26 files changed

+1936
-0
lines changed

26 files changed

+1936
-0
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
88
- Support for HTTP/2 (server-side) ([#3847](https://github.com/opensearch-project/OpenSearch/pull/3847))
99
- Add getter for path field in NestedQueryBuilder ([#4636](https://github.com/opensearch-project/OpenSearch/pull/4636))
1010
- Allow mmap to use new JDK-19 preview APIs in Apache Lucene 9.4+ ([#5151](https://github.com/opensearch-project/OpenSearch/pull/5151))
11+
- Add events correlation engine plugin ([#6854](https://github.com/opensearch-project/OpenSearch/issues/6854))
1112

1213
### Dependencies
1314
- Bump `log4j-core` from 2.18.0 to 2.19.0

libs/core/src/main/java/org/opensearch/core/ParseField.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -204,5 +204,6 @@ public static class CommonFields {
204204
public static final ParseField FORMAT = new ParseField("format");
205205
public static final ParseField MISSING = new ParseField("missing");
206206
public static final ParseField TIME_ZONE = new ParseField("time_zone");
207+
public static final ParseField _META = new ParseField("_meta");
207208
}
208209
}
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*
8+
* Modifications Copyright OpenSearch Contributors. See
9+
* GitHub history for details.
10+
*/
11+
12+
apply plugin: 'opensearch.java-rest-test'
13+
apply plugin: 'opensearch.internal-cluster-test'
14+
15+
opensearchplugin {
16+
description 'OpenSearch Events Correlation Engine.'
17+
classname 'org.opensearch.plugin.correlation.EventsCorrelationPlugin'
18+
}
19+
20+
dependencies {
21+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,197 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.plugin.correlation;
10+
11+
import org.apache.lucene.search.join.ScoreMode;
12+
import org.junit.Assert;
13+
import org.opensearch.action.admin.cluster.node.info.NodeInfo;
14+
import org.opensearch.action.admin.cluster.node.info.NodesInfoRequest;
15+
import org.opensearch.action.admin.cluster.node.info.NodesInfoResponse;
16+
import org.opensearch.action.admin.cluster.node.info.PluginsAndModules;
17+
import org.opensearch.action.search.SearchRequest;
18+
import org.opensearch.action.search.SearchResponse;
19+
import org.opensearch.index.query.NestedQueryBuilder;
20+
import org.opensearch.index.query.QueryBuilders;
21+
import org.opensearch.plugin.correlation.rules.action.IndexCorrelationRuleAction;
22+
import org.opensearch.plugin.correlation.rules.action.IndexCorrelationRuleRequest;
23+
import org.opensearch.plugin.correlation.rules.action.IndexCorrelationRuleResponse;
24+
import org.opensearch.plugin.correlation.rules.model.CorrelationQuery;
25+
import org.opensearch.plugin.correlation.rules.model.CorrelationRule;
26+
import org.opensearch.plugin.correlation.utils.CorrelationConstants;
27+
import org.opensearch.plugins.Plugin;
28+
import org.opensearch.plugins.PluginInfo;
29+
import org.opensearch.rest.RestRequest;
30+
import org.opensearch.rest.RestStatus;
31+
import org.opensearch.search.builder.SearchSourceBuilder;
32+
import org.opensearch.test.OpenSearchIntegTestCase;
33+
34+
import java.util.Arrays;
35+
import java.util.Collection;
36+
import java.util.List;
37+
import java.util.Map;
38+
import java.util.concurrent.ExecutionException;
39+
import java.util.function.Function;
40+
import java.util.stream.Collectors;
41+
import java.util.stream.Stream;
42+
43+
public class EventsCorrelationPluginTransportIT extends OpenSearchIntegTestCase {
44+
45+
@Override
46+
protected Collection<Class<? extends Plugin>> nodePlugins() {
47+
return Arrays.asList(EventsCorrelationPlugin.class);
48+
}
49+
50+
public void testPluginsAreInstalled() {
51+
NodesInfoRequest nodesInfoRequest = new NodesInfoRequest();
52+
nodesInfoRequest.addMetric(NodesInfoRequest.Metric.PLUGINS.metricName());
53+
NodesInfoResponse nodesInfoResponse = OpenSearchIntegTestCase.client().admin().cluster().nodesInfo(nodesInfoRequest).actionGet();
54+
List<PluginInfo> pluginInfos = nodesInfoResponse.getNodes()
55+
.stream()
56+
.flatMap(
57+
(Function<NodeInfo, Stream<PluginInfo>>) nodeInfo -> nodeInfo.getInfo(PluginsAndModules.class).getPluginInfos().stream()
58+
)
59+
.collect(Collectors.toList());
60+
Assert.assertTrue(
61+
pluginInfos.stream()
62+
.anyMatch(pluginInfo -> pluginInfo.getName().equals("org.opensearch.plugin.correlation.EventsCorrelationPlugin"))
63+
);
64+
}
65+
66+
public void testCreatingACorrelationRule() throws ExecutionException, InterruptedException {
67+
List<CorrelationQuery> correlationQueries = Arrays.asList(
68+
new CorrelationQuery("s3_access_logs", "aws.cloudtrail.eventName:ReplicateObject", "@timestamp", List.of("s3")),
69+
new CorrelationQuery("app_logs", "keywords:PermissionDenied", "@timestamp", List.of("others_application"))
70+
);
71+
CorrelationRule correlationRule = new CorrelationRule(
72+
CorrelationConstants.NO_ID,
73+
CorrelationConstants.NO_VERSION,
74+
"s3 to app logs",
75+
correlationQueries
76+
);
77+
IndexCorrelationRuleRequest request = new IndexCorrelationRuleRequest(
78+
CorrelationConstants.NO_ID,
79+
correlationRule,
80+
RestRequest.Method.POST
81+
);
82+
83+
IndexCorrelationRuleResponse response = client().execute(IndexCorrelationRuleAction.INSTANCE, request).get();
84+
Assert.assertEquals(RestStatus.CREATED, response.getStatus());
85+
86+
NestedQueryBuilder queryBuilder = QueryBuilders.nestedQuery(
87+
"correlate",
88+
QueryBuilders.matchQuery("correlate.index", "s3_access_logs"),
89+
ScoreMode.None
90+
);
91+
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
92+
searchSourceBuilder.query(queryBuilder);
93+
searchSourceBuilder.fetchSource(true);
94+
95+
SearchRequest searchRequest = new SearchRequest();
96+
searchRequest.indices(CorrelationRule.CORRELATION_RULE_INDEX);
97+
searchRequest.source(searchSourceBuilder);
98+
99+
SearchResponse searchResponse = client().search(searchRequest).get();
100+
Assert.assertEquals(1L, searchResponse.getHits().getTotalHits().value);
101+
}
102+
103+
public void testFilteringCorrelationRules() throws ExecutionException, InterruptedException {
104+
List<CorrelationQuery> correlationQueries1 = Arrays.asList(
105+
new CorrelationQuery("s3_access_logs", "aws.cloudtrail.eventName:ReplicateObject", "@timestamp", List.of("s3")),
106+
new CorrelationQuery("app_logs", "keywords:PermissionDenied", "@timestamp", List.of("others_application"))
107+
);
108+
CorrelationRule correlationRule1 = new CorrelationRule(
109+
CorrelationConstants.NO_ID,
110+
CorrelationConstants.NO_VERSION,
111+
"s3 to app logs",
112+
correlationQueries1
113+
);
114+
IndexCorrelationRuleRequest request1 = new IndexCorrelationRuleRequest(
115+
CorrelationConstants.NO_ID,
116+
correlationRule1,
117+
RestRequest.Method.POST
118+
);
119+
client().execute(IndexCorrelationRuleAction.INSTANCE, request1).get();
120+
121+
List<CorrelationQuery> correlationQueries2 = Arrays.asList(
122+
new CorrelationQuery("windows", "host.hostname:EC2AMAZ*", "@timestamp", List.of("windows")),
123+
new CorrelationQuery("app_logs", "endpoint:/customer_records.txt", "@timestamp", List.of("others_application"))
124+
);
125+
CorrelationRule correlationRule2 = new CorrelationRule(
126+
CorrelationConstants.NO_ID,
127+
CorrelationConstants.NO_VERSION,
128+
"windows to app logs",
129+
correlationQueries2
130+
);
131+
IndexCorrelationRuleRequest request2 = new IndexCorrelationRuleRequest(
132+
CorrelationConstants.NO_ID,
133+
correlationRule2,
134+
RestRequest.Method.POST
135+
);
136+
client().execute(IndexCorrelationRuleAction.INSTANCE, request2).get();
137+
138+
NestedQueryBuilder queryBuilder = QueryBuilders.nestedQuery(
139+
"correlate",
140+
QueryBuilders.matchQuery("correlate.index", "s3_access_logs"),
141+
ScoreMode.None
142+
);
143+
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
144+
searchSourceBuilder.query(queryBuilder);
145+
searchSourceBuilder.fetchSource(true);
146+
147+
SearchRequest searchRequest = new SearchRequest();
148+
searchRequest.indices(CorrelationRule.CORRELATION_RULE_INDEX);
149+
searchRequest.source(searchSourceBuilder);
150+
151+
SearchResponse searchResponse = client().search(searchRequest).get();
152+
Assert.assertEquals(1L, searchResponse.getHits().getTotalHits().value);
153+
}
154+
155+
@SuppressWarnings("unchecked")
156+
public void testCreatingACorrelationRuleWithNoTimestampField() throws ExecutionException, InterruptedException {
157+
List<CorrelationQuery> correlationQueries = Arrays.asList(
158+
new CorrelationQuery("s3_access_logs", "aws.cloudtrail.eventName:ReplicateObject", null, List.of("s3")),
159+
new CorrelationQuery("app_logs", "keywords:PermissionDenied", null, List.of("others_application"))
160+
);
161+
CorrelationRule correlationRule = new CorrelationRule(
162+
CorrelationConstants.NO_ID,
163+
CorrelationConstants.NO_VERSION,
164+
"s3 to app logs",
165+
correlationQueries
166+
);
167+
IndexCorrelationRuleRequest request = new IndexCorrelationRuleRequest(
168+
CorrelationConstants.NO_ID,
169+
correlationRule,
170+
RestRequest.Method.POST
171+
);
172+
173+
IndexCorrelationRuleResponse response = client().execute(IndexCorrelationRuleAction.INSTANCE, request).get();
174+
Assert.assertEquals(RestStatus.CREATED, response.getStatus());
175+
176+
NestedQueryBuilder queryBuilder = QueryBuilders.nestedQuery(
177+
"correlate",
178+
QueryBuilders.matchQuery("correlate.index", "s3_access_logs"),
179+
ScoreMode.None
180+
);
181+
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
182+
searchSourceBuilder.query(queryBuilder);
183+
searchSourceBuilder.fetchSource(true);
184+
185+
SearchRequest searchRequest = new SearchRequest();
186+
searchRequest.indices(CorrelationRule.CORRELATION_RULE_INDEX);
187+
searchRequest.source(searchSourceBuilder);
188+
189+
SearchResponse searchResponse = client().search(searchRequest).get();
190+
Assert.assertEquals(1L, searchResponse.getHits().getTotalHits().value);
191+
Assert.assertEquals(
192+
"_timestamp",
193+
((List<Map<String, Object>>) (searchResponse.getHits().getHits()[0].getSourceAsMap().get("correlate"))).get(0)
194+
.get("timestampField")
195+
);
196+
}
197+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,139 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.plugin.correlation;
10+
11+
import org.junit.Assert;
12+
import org.opensearch.action.search.SearchResponse;
13+
import org.opensearch.client.Request;
14+
import org.opensearch.client.Response;
15+
import org.opensearch.common.xcontent.LoggingDeprecationHandler;
16+
import org.opensearch.common.xcontent.json.JsonXContent;
17+
import org.opensearch.core.xcontent.NamedXContentRegistry;
18+
import org.opensearch.test.rest.OpenSearchRestTestCase;
19+
20+
import java.io.IOException;
21+
import java.util.List;
22+
import java.util.Map;
23+
24+
public class EventsCorrelationPluginRestIT extends OpenSearchRestTestCase {
25+
26+
@SuppressWarnings("unchecked")
27+
public void testPluginsAreInstalled() throws IOException {
28+
Request request = new Request("GET", "/_cat/plugins?s=component&h=name,component,version,description&format=json");
29+
Response response = client().performRequest(request);
30+
List<Object> pluginsList = JsonXContent.jsonXContent.createParser(
31+
NamedXContentRegistry.EMPTY,
32+
LoggingDeprecationHandler.INSTANCE,
33+
response.getEntity().getContent()
34+
).list();
35+
Assert.assertTrue(
36+
pluginsList.stream()
37+
.map(o -> (Map<String, Object>) o)
38+
.anyMatch(plugin -> plugin.get("component").equals("events-correlation-engine"))
39+
);
40+
}
41+
42+
public void testCreatingACorrelationRule() throws IOException {
43+
Request request = new Request("POST", "/_correlation/rules");
44+
request.setJsonEntity(sampleCorrelationRule());
45+
Response response = client().performRequest(request);
46+
47+
Assert.assertEquals(201, response.getStatusLine().getStatusCode());
48+
49+
Map<String, Object> responseMap = entityAsMap(response);
50+
String id = responseMap.get("_id").toString();
51+
52+
request = new Request("POST", "/.opensearch-correlation-rules-config/_search");
53+
request.setJsonEntity(matchIdQuery(id));
54+
response = client().performRequest(request);
55+
56+
Assert.assertEquals(200, response.getStatusLine().getStatusCode());
57+
SearchResponse searchResponse = SearchResponse.fromXContent(
58+
createParser(JsonXContent.jsonXContent, response.getEntity().getContent())
59+
);
60+
Assert.assertEquals(1L, searchResponse.getHits().getTotalHits().value);
61+
}
62+
63+
@SuppressWarnings("unchecked")
64+
public void testCreatingACorrelationRuleWithNoTimestampField() throws IOException {
65+
Request request = new Request("POST", "/_correlation/rules");
66+
request.setJsonEntity(sampleCorrelationRuleWithNoTimestamp());
67+
Response response = client().performRequest(request);
68+
69+
Assert.assertEquals(201, response.getStatusLine().getStatusCode());
70+
71+
Map<String, Object> responseMap = entityAsMap(response);
72+
String id = responseMap.get("_id").toString();
73+
74+
request = new Request("POST", "/.opensearch-correlation-rules-config/_search");
75+
request.setJsonEntity(matchIdQuery(id));
76+
response = client().performRequest(request);
77+
78+
Assert.assertEquals(200, response.getStatusLine().getStatusCode());
79+
SearchResponse searchResponse = SearchResponse.fromXContent(
80+
createParser(JsonXContent.jsonXContent, response.getEntity().getContent())
81+
);
82+
Assert.assertEquals(1L, searchResponse.getHits().getTotalHits().value);
83+
Assert.assertEquals(
84+
"_timestamp",
85+
((List<Map<String, Object>>) (searchResponse.getHits().getHits()[0].getSourceAsMap().get("correlate"))).get(0)
86+
.get("timestampField")
87+
);
88+
}
89+
90+
private String sampleCorrelationRule() {
91+
return "{\n"
92+
+ " \"name\": \"s3 to app logs\",\n"
93+
+ " \"correlate\": [\n"
94+
+ " {\n"
95+
+ " \"index\": \"s3_access_logs\",\n"
96+
+ " \"query\": \"aws.cloudtrail.eventName:ReplicateObject\",\n"
97+
+ " \"timestampField\": \"@timestamp\",\n"
98+
+ " \"tags\": [\n"
99+
+ " \"s3\"\n"
100+
+ " ]\n"
101+
+ " },\n"
102+
+ " {\n"
103+
+ " \"index\": \"app_logs\",\n"
104+
+ " \"query\": \"keywords:PermissionDenied\",\n"
105+
+ " \"timestampField\": \"@timestamp\",\n"
106+
+ " \"tags\": [\n"
107+
+ " \"others_application\"\n"
108+
+ " ]\n"
109+
+ " }\n"
110+
+ " ]\n"
111+
+ "}";
112+
}
113+
114+
private String sampleCorrelationRuleWithNoTimestamp() {
115+
return "{\n"
116+
+ " \"name\": \"s3 to app logs\",\n"
117+
+ " \"correlate\": [\n"
118+
+ " {\n"
119+
+ " \"index\": \"s3_access_logs\",\n"
120+
+ " \"query\": \"aws.cloudtrail.eventName:ReplicateObject\",\n"
121+
+ " \"tags\": [\n"
122+
+ " \"s3\"\n"
123+
+ " ]\n"
124+
+ " },\n"
125+
+ " {\n"
126+
+ " \"index\": \"app_logs\",\n"
127+
+ " \"query\": \"keywords:PermissionDenied\",\n"
128+
+ " \"tags\": [\n"
129+
+ " \"others_application\"\n"
130+
+ " ]\n"
131+
+ " }\n"
132+
+ " ]\n"
133+
+ "}";
134+
}
135+
136+
private String matchIdQuery(String id) {
137+
return "{\n" + " \"query\" : {\n" + " \"match\":{\n" + " \"_id\": \"" + id + "\"\n" + " }\n" + " }\n" + "}";
138+
}
139+
}

0 commit comments

Comments
 (0)