Skip to content

Commit 732e431

Browse files
authored
Prometheus Query Exemplars (#1782) (#1905)
Signed-off-by: Vamsi Manohar <[email protected]> (cherry picked from commit 430d7a9)
1 parent 4d7a02c commit 732e431

File tree

36 files changed

+1504
-212
lines changed

36 files changed

+1504
-212
lines changed

core/src/test/java/org/opensearch/sql/analysis/AnalyzerTestBase.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ public Pair<FunctionSignature, FunctionBuilder> resolve(
7070
FunctionSignature unresolvedSignature) {
7171
FunctionName functionName = FunctionName.of("query_range");
7272
FunctionSignature functionSignature =
73-
new FunctionSignature(functionName, List.of(STRING, LONG, LONG, LONG));
73+
new FunctionSignature(functionName, List.of(STRING, LONG, LONG, STRING));
7474
return Pair.of(
7575
functionSignature,
7676
(functionProperties, args) ->

docs/user/ppl/admin/prometheus_connector.rst

Lines changed: 73 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -191,12 +191,11 @@ PromQL Support for prometheus Connector
191191

192192
`query_range` Table Function
193193
----------------------------
194-
Prometheus connector offers `query_range` table function. This table function can be used to query metrics in a specific time range using promQL.
195-
The function takes inputs similar to parameters mentioned for query range api mentioned here: https://prometheus.io/docs/prometheus/latest/querying/api/
196-
Arguments should be either passed by name or positionArguments should be either passed by name or position.
197-
`source=my_prometheus.query_range('prometheus_http_requests_total', 1686694425, 1686700130, 14)`
198-
or
199-
`source=my_prometheus.query_range(query='prometheus_http_requests_total', starttime=1686694425, endtime=1686700130, step=14)`
194+
* Prometheus connector offers `query_range` table function. This table function can be used to query metrics in a specific time range using promQL.
195+
* The function takes inputs similar to parameters mentioned for query range api mentioned here: https://prometheus.io/docs/prometheus/latest/querying/api/
196+
* Arguments should be either passed by name or positionArguments should be either passed by name or position.
197+
- `source=my_prometheus.query_range('prometheus_http_requests_total', 1686694425, 1686700130, 14)`
198+
- `source=my_prometheus.query_range(query='prometheus_http_requests_total', starttime=1686694425, endtime=1686700130, step=14)`
200199
Example::
201200

202201
> source=my_prometheus.query_range('prometheus_http_requests_total', 1686694425, 1686700130, 14)
@@ -210,3 +209,71 @@ Example::
210209
| 9 | "2022-11-03 07:18:54" | "/-/promql" | 400 | 192.15.2.1 | prometheus |
211210
| 11 | "2022-11-03 07:18:64" |"/-/metrics" | 500 | 192.15.2.1 | prometheus |
212211
+------------+------------------------+--------------------------------+---------------+-------------+-------------+
212+
213+
214+
Prometheus Connector Table Functions
215+
==========================================
216+
217+
`query_exemplars` Table Function
218+
----------------------------
219+
* This table function can be used to fetch exemplars of a query in a specific time range.
220+
* The function takes inputs similar to parameters mentioned for query exemplars api mentioned here: https://prometheus.io/docs/prometheus/latest/querying/api/
221+
* Arguments should be either passed by name or positionArguments should be either passed by name or position.
222+
- `source=my_prometheus.query_exemplars('prometheus_http_requests_total', 1686694425, 1686700130)`
223+
- `source=my_prometheus.query_exemplars(query='prometheus_http_requests_total', starttime=1686694425, endtime=1686700130)`
224+
Example::
225+
226+
> source=my_prometheus.query_exemplars('prometheus_http_requests_total', 1686694425, 1686700130)
227+
"schema": [
228+
{
229+
"name": "seriesLabels",
230+
"type": "struct"
231+
},
232+
{
233+
"name": "exemplars",
234+
"type": "array"
235+
}
236+
],
237+
"datarows": [
238+
[
239+
{
240+
"instance": "localhost:8090",
241+
"__name__": "test_exemplar_metric_total",
242+
"service": "bar",
243+
"job": "prometheus"
244+
},
245+
[
246+
{
247+
"labels": {
248+
"traceID": "EpTxMJ40fUus7aGY"
249+
},
250+
"timestamp": "2020-09-14 15:22:25.479",
251+
"value": 6.0
252+
}
253+
]
254+
],
255+
[
256+
{
257+
"instance": "localhost:8090",
258+
"__name__": "test_exemplar_metric_total",
259+
"service": "foo",
260+
"job": "prometheus"
261+
},
262+
[
263+
{
264+
"labels": {
265+
"traceID": "Olp9XHlq763ccsfa"
266+
},
267+
"timestamp": "2020-09-14 15:22:35.479",
268+
"value": 19.0
269+
},
270+
{
271+
"labels": {
272+
"traceID": "hCtjygkIHwAN9vs4"
273+
},
274+
"timestamp": "2020-09-14 15:22:45.489",
275+
"value": 20.0
276+
}
277+
]
278+
]
279+
]

integ-test/src/test/java/org/opensearch/sql/ppl/PrometheusDataSourceCommandsIT.java

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -231,11 +231,11 @@ public void testQueryRange() {
231231
long currentTimestamp = new Date().getTime();
232232
JSONObject response =
233233
executeQuery("source=my_prometheus.query_range('prometheus_http_requests_total',"
234-
+ ((currentTimestamp/1000)-3600) + "," + currentTimestamp/1000 + ", " + 14 + ")" );
234+
+ ((currentTimestamp/1000)-3600) + "," + currentTimestamp/1000 + ", " + "'14'" + ")" );
235235
verifySchema(response,
236+
schema(LABELS, "struct"),
236237
schema(VALUE, "array"),
237-
schema(TIMESTAMP, "array"),
238-
schema(LABELS, "struct"));
238+
schema(TIMESTAMP, "array"));
239239
Assertions.assertTrue(response.getInt("size") > 0);
240240
}
241241

@@ -249,8 +249,19 @@ public void explainQueryRange() throws Exception {
249249
);
250250
}
251251

252+
@Test
253+
public void testExplainForQueryExemplars() throws Exception {
254+
String expected = loadFromFile("expectedOutput/ppl/explain_query_exemplars.json");
255+
assertJsonEquals(
256+
expected,
257+
explainQueryToString("source = my_prometheus."
258+
+ "query_exemplars('app_ads_ad_requests_total',1689228292,1689232299)")
259+
);
260+
}
261+
252262
String loadFromFile(String filename) throws Exception {
253263
URI uri = Resources.getResource(filename).toURI();
254264
return new String(Files.readAllBytes(Paths.get(uri)));
255265
}
266+
256267
}
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
{
2+
"root": {
3+
"name": "QueryExemplarsFunctionTableScanOperator",
4+
"description": {
5+
"request": "query_exemplars(app_ads_ad_requests_total, 1689228292, 1689232299)"
6+
},
7+
"children": []
8+
}
9+
}

prometheus/src/main/java/org/opensearch/sql/prometheus/client/PrometheusClient.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import java.io.IOException;
99
import java.util.List;
1010
import java.util.Map;
11+
import org.json.JSONArray;
1112
import org.json.JSONObject;
1213
import org.opensearch.sql.prometheus.request.system.model.MetricMetadata;
1314

@@ -18,4 +19,6 @@ public interface PrometheusClient {
1819
List<String> getLabels(String metricName) throws IOException;
1920

2021
Map<String, List<MetricMetadata>> getAllMetrics() throws IOException;
22+
23+
JSONArray queryExemplars(String query, Long start, Long end) throws IOException;
2124
}

prometheus/src/main/java/org/opensearch/sql/prometheus/client/PrometheusClientImpl.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,20 @@ public Map<String, List<MetricMetadata>> getAllMetrics() throws IOException {
8383
return new ObjectMapper().readValue(jsonObject.getJSONObject("data").toString(), typeRef);
8484
}
8585

86+
@Override
87+
public JSONArray queryExemplars(String query, Long start, Long end) throws IOException {
88+
String queryUrl = String.format("%s/api/v1/query_exemplars?query=%s&start=%s&end=%s",
89+
uri.toString().replaceAll("/$", ""), URLEncoder.encode(query, StandardCharsets.UTF_8),
90+
start, end);
91+
logger.debug("queryUrl: " + queryUrl);
92+
Request request = new Request.Builder()
93+
.url(queryUrl)
94+
.build();
95+
Response response = this.okHttpClient.newCall(request).execute();
96+
JSONObject jsonObject = readResponse(response);
97+
return jsonObject.getJSONArray("data");
98+
}
99+
86100
private List<String> toListOfLabels(JSONArray array) {
87101
List<String> result = new ArrayList<>();
88102
for (int i = 0; i < array.length(); i++) {

prometheus/src/main/java/org/opensearch/sql/prometheus/data/constants/PrometheusFieldConstants.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,4 +9,15 @@ public class PrometheusFieldConstants {
99
public static final String TIMESTAMP = "@timestamp";
1010
public static final String VALUE = "@value";
1111
public static final String LABELS = "@labels";
12+
public static final String MATRIX_KEY = "matrix";
13+
public static final String RESULT_TYPE_KEY = "resultType";
14+
public static final String METRIC_KEY = "metric";
15+
public static final String RESULT_KEY = "result";
16+
public static final String VALUES_KEY = "values";
17+
public static final String SERIES_LABELS_KEY = "seriesLabels";
18+
public static final String EXEMPLARS_KEY = "exemplars";
19+
public static final String TRACE_ID_KEY = "traceID";
20+
public static final String LABELS_KEY = "labels";
21+
public static final String TIMESTAMP_KEY = "timestamp";
22+
public static final String VALUE_KEY = "value";
1223
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
/*
2+
*
3+
* * Copyright OpenSearch Contributors
4+
* * SPDX-License-Identifier: Apache-2.0
5+
*
6+
*/
7+
8+
package org.opensearch.sql.prometheus.functions.implementation;
9+
10+
import static org.opensearch.sql.prometheus.functions.resolver.QueryRangeTableFunctionResolver.ENDTIME;
11+
import static org.opensearch.sql.prometheus.functions.resolver.QueryRangeTableFunctionResolver.QUERY;
12+
import static org.opensearch.sql.prometheus.functions.resolver.QueryRangeTableFunctionResolver.STARTTIME;
13+
14+
import java.util.List;
15+
import java.util.stream.Collectors;
16+
import org.opensearch.sql.data.model.ExprValue;
17+
import org.opensearch.sql.data.type.ExprCoreType;
18+
import org.opensearch.sql.data.type.ExprType;
19+
import org.opensearch.sql.exception.ExpressionEvaluationException;
20+
import org.opensearch.sql.expression.Expression;
21+
import org.opensearch.sql.expression.FunctionExpression;
22+
import org.opensearch.sql.expression.NamedArgumentExpression;
23+
import org.opensearch.sql.expression.env.Environment;
24+
import org.opensearch.sql.expression.function.FunctionName;
25+
import org.opensearch.sql.expression.function.TableFunctionImplementation;
26+
import org.opensearch.sql.prometheus.client.PrometheusClient;
27+
import org.opensearch.sql.prometheus.request.PrometheusQueryExemplarsRequest;
28+
import org.opensearch.sql.prometheus.storage.QueryExemplarsTable;
29+
import org.opensearch.sql.storage.Table;
30+
31+
public class QueryExemplarFunctionImplementation extends FunctionExpression implements
32+
TableFunctionImplementation {
33+
34+
private final FunctionName functionName;
35+
private final List<Expression> arguments;
36+
private final PrometheusClient prometheusClient;
37+
38+
/**
39+
* Required argument constructor.
40+
*
41+
* @param functionName name of the function
42+
* @param arguments a list of arguments provided
43+
*/
44+
public QueryExemplarFunctionImplementation(FunctionName functionName, List<Expression> arguments,
45+
PrometheusClient prometheusClient) {
46+
super(functionName, arguments);
47+
this.functionName = functionName;
48+
this.arguments = arguments;
49+
this.prometheusClient = prometheusClient;
50+
}
51+
52+
@Override
53+
public ExprValue valueOf(Environment<Expression, ExprValue> valueEnv) {
54+
throw new UnsupportedOperationException(String.format(
55+
"Prometheus defined function [%s] is only "
56+
+ "supported in SOURCE clause with prometheus connector catalog",
57+
functionName));
58+
}
59+
60+
@Override
61+
public ExprType type() {
62+
return ExprCoreType.STRUCT;
63+
}
64+
65+
@Override
66+
public String toString() {
67+
List<String> args = arguments.stream()
68+
.map(arg -> String.format("%s=%s", ((NamedArgumentExpression) arg)
69+
.getArgName(), ((NamedArgumentExpression) arg).getValue().toString()))
70+
.collect(Collectors.toList());
71+
return String.format("%s(%s)", functionName, String.join(", ", args));
72+
}
73+
74+
@Override
75+
public Table applyArguments() {
76+
return new QueryExemplarsTable(prometheusClient, buildExemplarsQueryRequest(arguments));
77+
}
78+
79+
private PrometheusQueryExemplarsRequest buildExemplarsQueryRequest(List<Expression> arguments) {
80+
81+
PrometheusQueryExemplarsRequest request = new PrometheusQueryExemplarsRequest();
82+
arguments.forEach(arg -> {
83+
String argName = ((NamedArgumentExpression) arg).getArgName();
84+
Expression argValue = ((NamedArgumentExpression) arg).getValue();
85+
ExprValue literalValue = argValue.valueOf();
86+
switch (argName) {
87+
case QUERY:
88+
request
89+
.setQuery((String) literalValue.value());
90+
break;
91+
case STARTTIME:
92+
request.setStartTime(((Number) literalValue.value()).longValue());
93+
break;
94+
case ENDTIME:
95+
request.setEndTime(((Number) literalValue.value()).longValue());
96+
break;
97+
default:
98+
throw new ExpressionEvaluationException(
99+
String.format("Invalid Function Argument:%s", argName));
100+
}
101+
});
102+
return request;
103+
}
104+
105+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.sql.prometheus.functions.resolver;
7+
8+
import static org.opensearch.sql.data.type.ExprCoreType.LONG;
9+
import static org.opensearch.sql.data.type.ExprCoreType.STRING;
10+
import static org.opensearch.sql.prometheus.utils.TableFunctionUtils.getNamedArgumentsOfTableFunction;
11+
import static org.opensearch.sql.prometheus.utils.TableFunctionUtils.validatePrometheusTableFunctionArguments;
12+
13+
import java.util.List;
14+
import lombok.RequiredArgsConstructor;
15+
import org.apache.commons.lang3.tuple.Pair;
16+
import org.opensearch.sql.expression.Expression;
17+
import org.opensearch.sql.expression.function.FunctionBuilder;
18+
import org.opensearch.sql.expression.function.FunctionName;
19+
import org.opensearch.sql.expression.function.FunctionResolver;
20+
import org.opensearch.sql.expression.function.FunctionSignature;
21+
import org.opensearch.sql.prometheus.client.PrometheusClient;
22+
import org.opensearch.sql.prometheus.functions.implementation.QueryExemplarFunctionImplementation;
23+
24+
/**
25+
* This class is for query_exemplars table function resolver {@link FunctionResolver}.
26+
* It takes care of validating function arguments and also creating
27+
* required {@link org.opensearch.sql.expression.function.TableFunctionImplementation} Class.
28+
*/
29+
@RequiredArgsConstructor
30+
public class QueryExemplarsTableFunctionResolver implements FunctionResolver {
31+
32+
private final PrometheusClient prometheusClient;
33+
public static final String QUERY_EXEMPLARS = "query_exemplars";
34+
35+
public static final String QUERY = "query";
36+
public static final String STARTTIME = "starttime";
37+
public static final String ENDTIME = "endtime";
38+
39+
@Override
40+
public Pair<FunctionSignature, FunctionBuilder> resolve(FunctionSignature unresolvedSignature) {
41+
final FunctionName functionName = FunctionName.of(QUERY_EXEMPLARS);
42+
FunctionSignature functionSignature =
43+
new FunctionSignature(FunctionName.of(QUERY_EXEMPLARS), List.of(STRING, LONG, LONG));
44+
FunctionBuilder functionBuilder = (functionProperties, arguments) -> {
45+
final List<String> argumentNames = List.of(QUERY, STARTTIME, ENDTIME);
46+
validatePrometheusTableFunctionArguments(arguments, argumentNames);
47+
List<Expression> namedArguments = getNamedArgumentsOfTableFunction(arguments, argumentNames);
48+
return new QueryExemplarFunctionImplementation(functionName,
49+
namedArguments, prometheusClient);
50+
};
51+
return Pair.of(functionSignature, functionBuilder);
52+
}
53+
54+
@Override
55+
public FunctionName getFunctionName() {
56+
return FunctionName.of(QUERY_EXEMPLARS);
57+
}
58+
}

0 commit comments

Comments
 (0)