Skip to content

Commit 64e2329

Browse files
ansjcyrayshrey
authored andcommitted
Query insights plugin implementation (opensearch-project#11903)
* Query insights plugin implementation Signed-off-by: Chenyang Ji <[email protected]> * Increase JavaDoc coverage and update PR based comments Signed-off-by: Chenyang Ji <[email protected]> * Refactor record and service to make them generic Signed-off-by: Chenyang Ji <[email protected]> * refactor service for improving multithreading efficiency Signed-off-by: Chenyang Ji <[email protected]> --------- Signed-off-by: Chenyang Ji <[email protected]>
1 parent 3758a09 commit 64e2329

24 files changed

+1622
-26
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -223,6 +223,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
223223
- Extract cluster management for integration tests into JUnit test rule out of OpenSearchIntegTestCase ([#11877](https://github.com/opensearch-project/OpenSearch/pull/11877)), ([#12000](https://github.com/opensearch-project/OpenSearch/pull/12000))
224224
- Workaround for https://bugs.openjdk.org/browse/JDK-8323659 regression, introduced in JDK-21.0.2 ([#11968](https://github.com/opensearch-project/OpenSearch/pull/11968))
225225
- Updates IpField to be searchable when only `doc_values` are enabled ([#11508](https://github.com/opensearch-project/OpenSearch/pull/11508))
226+
- [Query Insights] Query Insights Framework which currently supports retrieving the most time-consuming queries within the last configured time window ([#11903](https://github.com/opensearch-project/OpenSearch/pull/11903))
226227

227228
### Deprecated
228229

plugins/query-insights/build.gradle

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*
8+
* Modifications Copyright OpenSearch Contributors. See
9+
* GitHub history for details.
10+
*/
11+
12+
opensearchplugin {
13+
description 'OpenSearch Query Insights Plugin.'
14+
classname 'org.opensearch.plugin.insights.QueryInsightsPlugin'
15+
}
16+
17+
dependencies {
18+
}
Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.plugin.insights;
10+
11+
import org.opensearch.action.ActionRequest;
12+
import org.opensearch.client.Client;
13+
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
14+
import org.opensearch.cluster.node.DiscoveryNodes;
15+
import org.opensearch.cluster.service.ClusterService;
16+
import org.opensearch.common.settings.ClusterSettings;
17+
import org.opensearch.common.settings.IndexScopedSettings;
18+
import org.opensearch.common.settings.Setting;
19+
import org.opensearch.common.settings.Settings;
20+
import org.opensearch.common.settings.SettingsFilter;
21+
import org.opensearch.common.unit.TimeValue;
22+
import org.opensearch.common.util.concurrent.OpenSearchExecutors;
23+
import org.opensearch.core.action.ActionResponse;
24+
import org.opensearch.core.common.io.stream.NamedWriteableRegistry;
25+
import org.opensearch.core.xcontent.NamedXContentRegistry;
26+
import org.opensearch.env.Environment;
27+
import org.opensearch.env.NodeEnvironment;
28+
import org.opensearch.plugin.insights.core.service.QueryInsightsService;
29+
import org.opensearch.plugin.insights.settings.QueryInsightsSettings;
30+
import org.opensearch.plugins.ActionPlugin;
31+
import org.opensearch.plugins.Plugin;
32+
import org.opensearch.repositories.RepositoriesService;
33+
import org.opensearch.rest.RestController;
34+
import org.opensearch.rest.RestHandler;
35+
import org.opensearch.script.ScriptService;
36+
import org.opensearch.threadpool.ExecutorBuilder;
37+
import org.opensearch.threadpool.ScalingExecutorBuilder;
38+
import org.opensearch.threadpool.ThreadPool;
39+
import org.opensearch.watcher.ResourceWatcherService;
40+
41+
import java.util.Collection;
42+
import java.util.List;
43+
import java.util.function.Supplier;
44+
45+
/**
46+
* Plugin class for Query Insights.
47+
*/
48+
public class QueryInsightsPlugin extends Plugin implements ActionPlugin {
49+
/**
50+
* Default constructor
51+
*/
52+
public QueryInsightsPlugin() {}
53+
54+
@Override
55+
public Collection<Object> createComponents(
56+
final Client client,
57+
final ClusterService clusterService,
58+
final ThreadPool threadPool,
59+
final ResourceWatcherService resourceWatcherService,
60+
final ScriptService scriptService,
61+
final NamedXContentRegistry xContentRegistry,
62+
final Environment environment,
63+
final NodeEnvironment nodeEnvironment,
64+
final NamedWriteableRegistry namedWriteableRegistry,
65+
final IndexNameExpressionResolver indexNameExpressionResolver,
66+
final Supplier<RepositoriesService> repositoriesServiceSupplier
67+
) {
68+
// create top n queries service
69+
final QueryInsightsService queryInsightsService = new QueryInsightsService(threadPool);
70+
return List.of(queryInsightsService);
71+
}
72+
73+
@Override
74+
public List<ExecutorBuilder<?>> getExecutorBuilders(final Settings settings) {
75+
return List.of(
76+
new ScalingExecutorBuilder(
77+
QueryInsightsSettings.QUERY_INSIGHTS_EXECUTOR,
78+
1,
79+
Math.min((OpenSearchExecutors.allocatedProcessors(settings) + 1) / 2, QueryInsightsSettings.MAX_THREAD_COUNT),
80+
TimeValue.timeValueMinutes(5)
81+
)
82+
);
83+
}
84+
85+
@Override
86+
public List<RestHandler> getRestHandlers(
87+
final Settings settings,
88+
final RestController restController,
89+
final ClusterSettings clusterSettings,
90+
final IndexScopedSettings indexScopedSettings,
91+
final SettingsFilter settingsFilter,
92+
final IndexNameExpressionResolver indexNameExpressionResolver,
93+
final Supplier<DiscoveryNodes> nodesInCluster
94+
) {
95+
return List.of();
96+
}
97+
98+
@Override
99+
public List<ActionHandler<? extends ActionRequest, ? extends ActionResponse>> getActions() {
100+
return List.of();
101+
}
102+
103+
@Override
104+
public List<Setting<?>> getSettings() {
105+
return List.of(
106+
// Settings for top N queries
107+
QueryInsightsSettings.TOP_N_LATENCY_QUERIES_ENABLED,
108+
QueryInsightsSettings.TOP_N_LATENCY_QUERIES_SIZE,
109+
QueryInsightsSettings.TOP_N_LATENCY_QUERIES_WINDOW_SIZE
110+
);
111+
}
112+
}
Lines changed: 180 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,180 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.plugin.insights.core.service;
10+
11+
import org.opensearch.common.inject.Inject;
12+
import org.opensearch.common.lifecycle.AbstractLifecycleComponent;
13+
import org.opensearch.plugin.insights.rules.model.MetricType;
14+
import org.opensearch.plugin.insights.rules.model.SearchQueryRecord;
15+
import org.opensearch.plugin.insights.settings.QueryInsightsSettings;
16+
import org.opensearch.threadpool.Scheduler;
17+
import org.opensearch.threadpool.ThreadPool;
18+
19+
import java.util.ArrayList;
20+
import java.util.Comparator;
21+
import java.util.HashMap;
22+
import java.util.List;
23+
import java.util.Map;
24+
import java.util.concurrent.LinkedBlockingQueue;
25+
26+
/**
27+
* Service responsible for gathering, analyzing, storing and exporting
28+
* information related to search queries
29+
*
30+
* @opensearch.internal
31+
*/
32+
public class QueryInsightsService extends AbstractLifecycleComponent {
33+
/**
34+
* The internal OpenSearch thread pool that execute async processing and exporting tasks
35+
*/
36+
private final ThreadPool threadPool;
37+
38+
/**
39+
* Services to capture top n queries for different metric types
40+
*/
41+
private final Map<MetricType, TopQueriesService> topQueriesServices;
42+
43+
/**
44+
* Flags for enabling insight data collection for different metric types
45+
*/
46+
private final Map<MetricType, Boolean> enableCollect;
47+
48+
/**
49+
* The internal thread-safe queue to ingest the search query data and subsequently forward to processors
50+
*/
51+
private final LinkedBlockingQueue<SearchQueryRecord> queryRecordsQueue;
52+
53+
/**
54+
* Holds a reference to delayed operation {@link Scheduler.Cancellable} so it can be cancelled when
55+
* the service closed concurrently.
56+
*/
57+
protected volatile Scheduler.Cancellable scheduledFuture;
58+
59+
/**
60+
* Constructor of the QueryInsightsService
61+
*
62+
* @param threadPool The OpenSearch thread pool to run async tasks
63+
*/
64+
@Inject
65+
public QueryInsightsService(final ThreadPool threadPool) {
66+
enableCollect = new HashMap<>();
67+
queryRecordsQueue = new LinkedBlockingQueue<>(QueryInsightsSettings.QUERY_RECORD_QUEUE_CAPACITY);
68+
topQueriesServices = new HashMap<>();
69+
for (MetricType metricType : MetricType.allMetricTypes()) {
70+
enableCollect.put(metricType, false);
71+
topQueriesServices.put(metricType, new TopQueriesService(metricType));
72+
}
73+
this.threadPool = threadPool;
74+
}
75+
76+
/**
77+
* Ingest the query data into in-memory stores
78+
*
79+
* @param record the record to ingest
80+
*/
81+
public boolean addRecord(final SearchQueryRecord record) {
82+
boolean shouldAdd = false;
83+
for (Map.Entry<MetricType, TopQueriesService> entry : topQueriesServices.entrySet()) {
84+
if (!enableCollect.get(entry.getKey())) {
85+
continue;
86+
}
87+
List<SearchQueryRecord> currentSnapshot = entry.getValue().getTopQueriesCurrentSnapshot();
88+
// skip add to top N queries store if the incoming record is smaller than the Nth record
89+
if (currentSnapshot.size() < entry.getValue().getTopNSize()
90+
|| SearchQueryRecord.compare(record, currentSnapshot.get(0), entry.getKey()) > 0) {
91+
shouldAdd = true;
92+
break;
93+
}
94+
}
95+
if (shouldAdd) {
96+
return queryRecordsQueue.offer(record);
97+
}
98+
return false;
99+
}
100+
101+
/**
102+
* Drain the queryRecordsQueue into internal stores and services
103+
*/
104+
public void drainRecords() {
105+
final List<SearchQueryRecord> records = new ArrayList<>();
106+
queryRecordsQueue.drainTo(records);
107+
records.sort(Comparator.comparingLong(SearchQueryRecord::getTimestamp));
108+
for (MetricType metricType : MetricType.allMetricTypes()) {
109+
if (enableCollect.get(metricType)) {
110+
// ingest the records into topQueriesService
111+
topQueriesServices.get(metricType).consumeRecords(records);
112+
}
113+
}
114+
}
115+
116+
/**
117+
* Get the top queries service based on metricType
118+
* @param metricType {@link MetricType}
119+
* @return {@link TopQueriesService}
120+
*/
121+
public TopQueriesService getTopQueriesService(final MetricType metricType) {
122+
return topQueriesServices.get(metricType);
123+
}
124+
125+
/**
126+
* Set flag to enable or disable Query Insights data collection
127+
*
128+
* @param metricType {@link MetricType}
129+
* @param enable Flag to enable or disable Query Insights data collection
130+
*/
131+
public void enableCollection(final MetricType metricType, final boolean enable) {
132+
this.enableCollect.put(metricType, enable);
133+
this.topQueriesServices.get(metricType).setEnabled(enable);
134+
}
135+
136+
/**
137+
* Get if the Query Insights data collection is enabled for a MetricType
138+
*
139+
* @param metricType {@link MetricType}
140+
* @return if the Query Insights data collection is enabled
141+
*/
142+
public boolean isCollectionEnabled(final MetricType metricType) {
143+
return this.enableCollect.get(metricType);
144+
}
145+
146+
/**
147+
* Check if query insights service is enabled
148+
*
149+
* @return if query insights service is enabled
150+
*/
151+
public boolean isEnabled() {
152+
for (MetricType t : MetricType.allMetricTypes()) {
153+
if (isCollectionEnabled(t)) {
154+
return true;
155+
}
156+
}
157+
return false;
158+
}
159+
160+
@Override
161+
protected void doStart() {
162+
if (isEnabled()) {
163+
scheduledFuture = threadPool.scheduleWithFixedDelay(
164+
this::drainRecords,
165+
QueryInsightsSettings.QUERY_RECORD_QUEUE_DRAIN_INTERVAL,
166+
QueryInsightsSettings.QUERY_INSIGHTS_EXECUTOR
167+
);
168+
}
169+
}
170+
171+
@Override
172+
protected void doStop() {
173+
if (scheduledFuture != null) {
174+
scheduledFuture.cancel();
175+
}
176+
}
177+
178+
@Override
179+
protected void doClose() {}
180+
}

0 commit comments

Comments
 (0)