Skip to content

Commit 2f5bf5b

Browse files
authored
Add correlation rule layer for events-correlation-engine (#7132)
Adds new correlation engine feature by way of a new `:plugins:events-correlation-engine` plugin. The endpoint is /_correlation and users can create new rules using the following example DSL: { "name": "s3 to app logs", "correlate": [ { "index": "s3_access_logs", "query": "aws.cloudtrail.eventName:ReplicateObject", "timestampField": "@timestamp", "tags": [ "s3" ] } ] } Signed-off-by: Subhobrata Dey <[email protected]>
1 parent 621b27f commit 2f5bf5b

File tree

26 files changed

+1905
-0
lines changed

26 files changed

+1905
-0
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
88
- Support for HTTP/2 (server-side) ([#3847](https://github.com/opensearch-project/OpenSearch/pull/3847))
99
- Add getter for path field in NestedQueryBuilder ([#4636](https://github.com/opensearch-project/OpenSearch/pull/4636))
1010
- Allow mmap to use new JDK-19 preview APIs in Apache Lucene 9.4+ ([#5151](https://github.com/opensearch-project/OpenSearch/pull/5151))
11+
- Add events correlation engine plugin ([#6854](https://github.com/opensearch-project/OpenSearch/issues/6854))
1112

1213
### Dependencies
1314
- Bump `log4j-core` from 2.18.0 to 2.19.0

gradle/missing-javadoc.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,7 @@ configure([
164164
configure([
165165
project(":libs:opensearch-common"),
166166
project(":libs:opensearch-core"),
167+
project(":plugins:events-correlation-engine"),
167168
project(":server")
168169
]) {
169170
project.tasks.withType(MissingJavadocTask) {

libs/core/src/main/java/org/opensearch/core/ParseField.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -204,5 +204,6 @@ public static class CommonFields {
204204
public static final ParseField FORMAT = new ParseField("format");
205205
public static final ParseField MISSING = new ParseField("missing");
206206
public static final ParseField TIME_ZONE = new ParseField("time_zone");
207+
public static final ParseField _META = new ParseField("_meta");
207208
}
208209
}
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*
8+
* Modifications Copyright OpenSearch Contributors. See
9+
* GitHub history for details.
10+
*/
11+
12+
apply plugin: 'opensearch.java-rest-test'
13+
apply plugin: 'opensearch.internal-cluster-test'
14+
15+
opensearchplugin {
16+
description 'OpenSearch Events Correlation Engine.'
17+
classname 'org.opensearch.plugin.correlation.EventsCorrelationPlugin'
18+
}
19+
20+
dependencies {
21+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,177 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.plugin.correlation;
10+
11+
import org.apache.lucene.search.join.ScoreMode;
12+
import org.junit.Assert;
13+
import org.opensearch.action.admin.cluster.node.info.NodeInfo;
14+
import org.opensearch.action.admin.cluster.node.info.NodesInfoRequest;
15+
import org.opensearch.action.admin.cluster.node.info.NodesInfoResponse;
16+
import org.opensearch.action.admin.cluster.node.info.PluginsAndModules;
17+
import org.opensearch.action.search.SearchRequest;
18+
import org.opensearch.action.search.SearchResponse;
19+
import org.opensearch.index.query.NestedQueryBuilder;
20+
import org.opensearch.index.query.QueryBuilders;
21+
import org.opensearch.plugin.correlation.rules.action.IndexCorrelationRuleAction;
22+
import org.opensearch.plugin.correlation.rules.action.IndexCorrelationRuleRequest;
23+
import org.opensearch.plugin.correlation.rules.action.IndexCorrelationRuleResponse;
24+
import org.opensearch.plugin.correlation.rules.model.CorrelationQuery;
25+
import org.opensearch.plugin.correlation.rules.model.CorrelationRule;
26+
import org.opensearch.plugins.Plugin;
27+
import org.opensearch.plugins.PluginInfo;
28+
import org.opensearch.rest.RestRequest;
29+
import org.opensearch.rest.RestStatus;
30+
import org.opensearch.search.builder.SearchSourceBuilder;
31+
import org.opensearch.test.OpenSearchIntegTestCase;
32+
33+
import java.util.Arrays;
34+
import java.util.Collection;
35+
import java.util.List;
36+
import java.util.Map;
37+
import java.util.function.Function;
38+
import java.util.stream.Collectors;
39+
import java.util.stream.Stream;
40+
41+
/**
42+
* Transport Action tests for events-correlation-plugin
43+
*/
44+
public class EventsCorrelationPluginTransportIT extends OpenSearchIntegTestCase {
45+
46+
@Override
47+
protected Collection<Class<? extends Plugin>> nodePlugins() {
48+
return Arrays.asList(EventsCorrelationPlugin.class);
49+
}
50+
51+
/**
52+
* test events-correlation-plugin is installed
53+
*/
54+
public void testPluginsAreInstalled() {
55+
NodesInfoRequest nodesInfoRequest = new NodesInfoRequest();
56+
nodesInfoRequest.addMetric(NodesInfoRequest.Metric.PLUGINS.metricName());
57+
NodesInfoResponse nodesInfoResponse = OpenSearchIntegTestCase.client().admin().cluster().nodesInfo(nodesInfoRequest).actionGet();
58+
List<PluginInfo> pluginInfos = nodesInfoResponse.getNodes()
59+
.stream()
60+
.flatMap(
61+
(Function<NodeInfo, Stream<PluginInfo>>) nodeInfo -> nodeInfo.getInfo(PluginsAndModules.class).getPluginInfos().stream()
62+
)
63+
.collect(Collectors.toList());
64+
Assert.assertTrue(
65+
pluginInfos.stream()
66+
.anyMatch(pluginInfo -> pluginInfo.getName().equals("org.opensearch.plugin.correlation.EventsCorrelationPlugin"))
67+
);
68+
}
69+
70+
/**
71+
* test creating a correlation rule
72+
* @throws Exception Exception
73+
*/
74+
public void testCreatingACorrelationRule() throws Exception {
75+
List<CorrelationQuery> correlationQueries = Arrays.asList(
76+
new CorrelationQuery("s3_access_logs", "aws.cloudtrail.eventName:ReplicateObject", "@timestamp", List.of("s3")),
77+
new CorrelationQuery("app_logs", "keywords:PermissionDenied", "@timestamp", List.of("others_application"))
78+
);
79+
CorrelationRule correlationRule = new CorrelationRule("s3 to app logs", correlationQueries);
80+
IndexCorrelationRuleRequest request = new IndexCorrelationRuleRequest(correlationRule, RestRequest.Method.POST);
81+
82+
IndexCorrelationRuleResponse response = client().execute(IndexCorrelationRuleAction.INSTANCE, request).get();
83+
Assert.assertEquals(RestStatus.CREATED, response.getStatus());
84+
85+
NestedQueryBuilder queryBuilder = QueryBuilders.nestedQuery(
86+
"correlate",
87+
QueryBuilders.matchQuery("correlate.index", "s3_access_logs"),
88+
ScoreMode.None
89+
);
90+
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
91+
searchSourceBuilder.query(queryBuilder);
92+
searchSourceBuilder.fetchSource(true);
93+
94+
SearchRequest searchRequest = new SearchRequest();
95+
searchRequest.indices(CorrelationRule.CORRELATION_RULE_INDEX);
96+
searchRequest.source(searchSourceBuilder);
97+
98+
SearchResponse searchResponse = client().search(searchRequest).get();
99+
Assert.assertEquals(1L, searchResponse.getHits().getTotalHits().value);
100+
}
101+
102+
/**
103+
* test filtering correlation rules
104+
* @throws Exception Exception
105+
*/
106+
public void testFilteringCorrelationRules() throws Exception {
107+
List<CorrelationQuery> correlationQueries1 = Arrays.asList(
108+
new CorrelationQuery("s3_access_logs", "aws.cloudtrail.eventName:ReplicateObject", "@timestamp", List.of("s3")),
109+
new CorrelationQuery("app_logs", "keywords:PermissionDenied", "@timestamp", List.of("others_application"))
110+
);
111+
CorrelationRule correlationRule1 = new CorrelationRule("s3 to app logs", correlationQueries1);
112+
IndexCorrelationRuleRequest request1 = new IndexCorrelationRuleRequest(correlationRule1, RestRequest.Method.POST);
113+
client().execute(IndexCorrelationRuleAction.INSTANCE, request1).get();
114+
115+
List<CorrelationQuery> correlationQueries2 = Arrays.asList(
116+
new CorrelationQuery("windows", "host.hostname:EC2AMAZ*", "@timestamp", List.of("windows")),
117+
new CorrelationQuery("app_logs", "endpoint:/customer_records.txt", "@timestamp", List.of("others_application"))
118+
);
119+
CorrelationRule correlationRule2 = new CorrelationRule("windows to app logs", correlationQueries2);
120+
IndexCorrelationRuleRequest request2 = new IndexCorrelationRuleRequest(correlationRule2, RestRequest.Method.POST);
121+
client().execute(IndexCorrelationRuleAction.INSTANCE, request2).get();
122+
123+
NestedQueryBuilder queryBuilder = QueryBuilders.nestedQuery(
124+
"correlate",
125+
QueryBuilders.matchQuery("correlate.index", "s3_access_logs"),
126+
ScoreMode.None
127+
);
128+
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
129+
searchSourceBuilder.query(queryBuilder);
130+
searchSourceBuilder.fetchSource(true);
131+
132+
SearchRequest searchRequest = new SearchRequest();
133+
searchRequest.indices(CorrelationRule.CORRELATION_RULE_INDEX);
134+
searchRequest.source(searchSourceBuilder);
135+
136+
SearchResponse searchResponse = client().search(searchRequest).get();
137+
Assert.assertEquals(1L, searchResponse.getHits().getTotalHits().value);
138+
}
139+
140+
/**
141+
* test creating a correlation rule with no timestamp field
142+
* @throws Exception Exception
143+
*/
144+
@SuppressWarnings("unchecked")
145+
public void testCreatingACorrelationRuleWithNoTimestampField() throws Exception {
146+
List<CorrelationQuery> correlationQueries = Arrays.asList(
147+
new CorrelationQuery("s3_access_logs", "aws.cloudtrail.eventName:ReplicateObject", null, List.of("s3")),
148+
new CorrelationQuery("app_logs", "keywords:PermissionDenied", null, List.of("others_application"))
149+
);
150+
CorrelationRule correlationRule = new CorrelationRule("s3 to app logs", correlationQueries);
151+
IndexCorrelationRuleRequest request = new IndexCorrelationRuleRequest(correlationRule, RestRequest.Method.POST);
152+
153+
IndexCorrelationRuleResponse response = client().execute(IndexCorrelationRuleAction.INSTANCE, request).get();
154+
Assert.assertEquals(RestStatus.CREATED, response.getStatus());
155+
156+
NestedQueryBuilder queryBuilder = QueryBuilders.nestedQuery(
157+
"correlate",
158+
QueryBuilders.matchQuery("correlate.index", "s3_access_logs"),
159+
ScoreMode.None
160+
);
161+
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
162+
searchSourceBuilder.query(queryBuilder);
163+
searchSourceBuilder.fetchSource(true);
164+
165+
SearchRequest searchRequest = new SearchRequest();
166+
searchRequest.indices(CorrelationRule.CORRELATION_RULE_INDEX);
167+
searchRequest.source(searchSourceBuilder);
168+
169+
SearchResponse searchResponse = client().search(searchRequest).get();
170+
Assert.assertEquals(1L, searchResponse.getHits().getTotalHits().value);
171+
Assert.assertEquals(
172+
"_timestamp",
173+
((List<Map<String, Object>>) (searchResponse.getHits().getHits()[0].getSourceAsMap().get("correlate"))).get(0)
174+
.get("timestampField")
175+
);
176+
}
177+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,154 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.plugin.correlation;
10+
11+
import org.junit.Assert;
12+
import org.opensearch.action.search.SearchResponse;
13+
import org.opensearch.client.Request;
14+
import org.opensearch.client.Response;
15+
import org.opensearch.common.xcontent.LoggingDeprecationHandler;
16+
import org.opensearch.common.xcontent.json.JsonXContent;
17+
import org.opensearch.core.xcontent.NamedXContentRegistry;
18+
import org.opensearch.test.rest.OpenSearchRestTestCase;
19+
20+
import java.io.IOException;
21+
import java.util.List;
22+
import java.util.Map;
23+
24+
/**
25+
* Rest Action tests for events-correlation-plugin
26+
*/
27+
public class EventsCorrelationPluginRestIT extends OpenSearchRestTestCase {
28+
29+
/**
30+
* test events-correlation-plugin is installed
31+
* @throws IOException IOException
32+
*/
33+
@SuppressWarnings("unchecked")
34+
public void testPluginsAreInstalled() throws IOException {
35+
Request request = new Request("GET", "/_cat/plugins?s=component&h=name,component,version,description&format=json");
36+
Response response = client().performRequest(request);
37+
List<Object> pluginsList = JsonXContent.jsonXContent.createParser(
38+
NamedXContentRegistry.EMPTY,
39+
LoggingDeprecationHandler.INSTANCE,
40+
response.getEntity().getContent()
41+
).list();
42+
Assert.assertTrue(
43+
pluginsList.stream()
44+
.map(o -> (Map<String, Object>) o)
45+
.anyMatch(plugin -> plugin.get("component").equals("events-correlation-engine"))
46+
);
47+
}
48+
49+
/**
50+
* test creating a correlation rule
51+
* @throws IOException IOException
52+
*/
53+
public void testCreatingACorrelationRule() throws IOException {
54+
Request request = new Request("POST", "/_correlation/rules");
55+
request.setJsonEntity(sampleCorrelationRule());
56+
Response response = client().performRequest(request);
57+
58+
Assert.assertEquals(201, response.getStatusLine().getStatusCode());
59+
60+
Map<String, Object> responseMap = entityAsMap(response);
61+
String id = responseMap.get("_id").toString();
62+
63+
request = new Request("POST", "/.opensearch-correlation-rules-config/_search");
64+
request.setJsonEntity(matchIdQuery(id));
65+
response = client().performRequest(request);
66+
67+
Assert.assertEquals(200, response.getStatusLine().getStatusCode());
68+
SearchResponse searchResponse = SearchResponse.fromXContent(
69+
createParser(JsonXContent.jsonXContent, response.getEntity().getContent())
70+
);
71+
Assert.assertEquals(1L, searchResponse.getHits().getTotalHits().value);
72+
}
73+
74+
/**
75+
* test creating a correlation rule with no timestamp field
76+
* @throws IOException IOException
77+
*/
78+
@SuppressWarnings("unchecked")
79+
public void testCreatingACorrelationRuleWithNoTimestampField() throws IOException {
80+
Request request = new Request("POST", "/_correlation/rules");
81+
request.setJsonEntity(sampleCorrelationRuleWithNoTimestamp());
82+
Response response = client().performRequest(request);
83+
84+
Assert.assertEquals(201, response.getStatusLine().getStatusCode());
85+
86+
Map<String, Object> responseMap = entityAsMap(response);
87+
String id = responseMap.get("_id").toString();
88+
89+
request = new Request("POST", "/.opensearch-correlation-rules-config/_search");
90+
request.setJsonEntity(matchIdQuery(id));
91+
response = client().performRequest(request);
92+
93+
Assert.assertEquals(200, response.getStatusLine().getStatusCode());
94+
SearchResponse searchResponse = SearchResponse.fromXContent(
95+
createParser(JsonXContent.jsonXContent, response.getEntity().getContent())
96+
);
97+
Assert.assertEquals(1L, searchResponse.getHits().getTotalHits().value);
98+
Assert.assertEquals(
99+
"_timestamp",
100+
((List<Map<String, Object>>) (searchResponse.getHits().getHits()[0].getSourceAsMap().get("correlate"))).get(0)
101+
.get("timestampField")
102+
);
103+
}
104+
105+
private String sampleCorrelationRule() {
106+
return "{\n"
107+
+ " \"name\": \"s3 to app logs\",\n"
108+
+ " \"correlate\": [\n"
109+
+ " {\n"
110+
+ " \"index\": \"s3_access_logs\",\n"
111+
+ " \"query\": \"aws.cloudtrail.eventName:ReplicateObject\",\n"
112+
+ " \"timestampField\": \"@timestamp\",\n"
113+
+ " \"tags\": [\n"
114+
+ " \"s3\"\n"
115+
+ " ]\n"
116+
+ " },\n"
117+
+ " {\n"
118+
+ " \"index\": \"app_logs\",\n"
119+
+ " \"query\": \"keywords:PermissionDenied\",\n"
120+
+ " \"timestampField\": \"@timestamp\",\n"
121+
+ " \"tags\": [\n"
122+
+ " \"others_application\"\n"
123+
+ " ]\n"
124+
+ " }\n"
125+
+ " ]\n"
126+
+ "}";
127+
}
128+
129+
private String sampleCorrelationRuleWithNoTimestamp() {
130+
return "{\n"
131+
+ " \"name\": \"s3 to app logs\",\n"
132+
+ " \"correlate\": [\n"
133+
+ " {\n"
134+
+ " \"index\": \"s3_access_logs\",\n"
135+
+ " \"query\": \"aws.cloudtrail.eventName:ReplicateObject\",\n"
136+
+ " \"tags\": [\n"
137+
+ " \"s3\"\n"
138+
+ " ]\n"
139+
+ " },\n"
140+
+ " {\n"
141+
+ " \"index\": \"app_logs\",\n"
142+
+ " \"query\": \"keywords:PermissionDenied\",\n"
143+
+ " \"tags\": [\n"
144+
+ " \"others_application\"\n"
145+
+ " ]\n"
146+
+ " }\n"
147+
+ " ]\n"
148+
+ "}";
149+
}
150+
151+
private String matchIdQuery(String id) {
152+
return "{\n" + " \"query\" : {\n" + " \"match\":{\n" + " \"_id\": \"" + id + "\"\n" + " }\n" + " }\n" + "}";
153+
}
154+
}

0 commit comments

Comments
 (0)