Skip to content

Commit 4102b58

Browse files
authored
Change query range response structure (opensearch-project#1867)
Signed-off-by: Vamsi Manohar <[email protected]>
1 parent f92e48c commit 4102b58

File tree

10 files changed

+115
-151
lines changed

10 files changed

+115
-151
lines changed

integ-test/src/test/java/org/opensearch/sql/ppl/PrometheusDataSourceCommandsIT.java

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,22 +7,28 @@
77

88
package org.opensearch.sql.ppl;
99

10+
import static org.opensearch.sql.prometheus.data.constants.PrometheusFieldConstants.LABELS;
1011
import static org.opensearch.sql.prometheus.data.constants.PrometheusFieldConstants.TIMESTAMP;
1112
import static org.opensearch.sql.prometheus.data.constants.PrometheusFieldConstants.VALUE;
13+
import static org.opensearch.sql.util.MatcherUtils.assertJsonEquals;
1214
import static org.opensearch.sql.util.MatcherUtils.schema;
1315
import static org.opensearch.sql.util.MatcherUtils.verifySchema;
1416

1517
import com.google.common.collect.ImmutableList;
1618
import com.google.common.collect.ImmutableMap;
19+
import com.google.common.io.Resources;
1720
import java.io.IOException;
21+
import java.net.URI;
22+
import java.nio.file.Files;
23+
import java.nio.file.Paths;
1824
import java.text.SimpleDateFormat;
1925
import java.util.Date;
26+
import lombok.Data;
2027
import lombok.SneakyThrows;
2128
import org.apache.commons.lang3.StringUtils;
2229
import org.json.JSONArray;
2330
import org.json.JSONObject;
2431
import org.junit.After;
25-
import org.junit.AfterClass;
2632
import org.junit.Assert;
2733
import org.junit.BeforeClass;
2834
import org.junit.jupiter.api.Assertions;
@@ -218,4 +224,33 @@ public void testMetricSumAggregationCommand() {
218224
}
219225
}
220226

227+
228+
@Test
229+
@SneakyThrows
230+
public void testQueryRange() {
231+
long currentTimestamp = new Date().getTime();
232+
JSONObject response =
233+
executeQuery("source=my_prometheus.query_range('prometheus_http_requests_total',"
234+
+ ((currentTimestamp/1000)-3600) + "," + currentTimestamp/1000 + ", " + 14 + ")" );
235+
verifySchema(response,
236+
schema(VALUE, "array"),
237+
schema(TIMESTAMP, "array"),
238+
schema(LABELS, "struct"));
239+
Assertions.assertTrue(response.getInt("size") > 0);
240+
}
241+
242+
@Test
243+
public void explainQueryRange() throws Exception {
244+
String expected = loadFromFile("expectedOutput/ppl/explain_query_range.json");
245+
assertJsonEquals(
246+
expected,
247+
explainQueryToString("source = my_prometheus"
248+
+ ".query_range('prometheus_http_requests_total',1689281439,1689291439,14)")
249+
);
250+
}
251+
252+
String loadFromFile(String filename) throws Exception {
253+
URI uri = Resources.getResource(filename).toURI();
254+
return new String(Files.readAllBytes(Paths.get(uri)));
255+
}
221256
}
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
{
2+
"root": {
3+
"name": "QueryRangeFunctionTableScanOperator",
4+
"description": {
5+
"request": "query_range(prometheus_http_requests_total, 1689281439, 1689291439, 14)"
6+
},
7+
"children": []
8+
}
9+
}

prometheus/src/main/java/org/opensearch/sql/prometheus/functions/response/DefaultQueryRangeFunctionResponseHandle.java

Lines changed: 37 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -5,24 +5,25 @@
55

66
package org.opensearch.sql.prometheus.functions.response;
77

8+
import static org.opensearch.sql.prometheus.data.constants.PrometheusFieldConstants.LABELS;
9+
import static org.opensearch.sql.prometheus.data.constants.PrometheusFieldConstants.TIMESTAMP;
810
import static org.opensearch.sql.prometheus.data.constants.PrometheusFieldConstants.VALUE;
911

1012
import java.time.Instant;
1113
import java.util.ArrayList;
1214
import java.util.Iterator;
1315
import java.util.LinkedHashMap;
1416
import java.util.List;
15-
import org.jetbrains.annotations.NotNull;
1617
import org.json.JSONArray;
1718
import org.json.JSONObject;
19+
import org.opensearch.sql.data.model.ExprCollectionValue;
1820
import org.opensearch.sql.data.model.ExprDoubleValue;
1921
import org.opensearch.sql.data.model.ExprStringValue;
2022
import org.opensearch.sql.data.model.ExprTimestampValue;
2123
import org.opensearch.sql.data.model.ExprTupleValue;
2224
import org.opensearch.sql.data.model.ExprValue;
2325
import org.opensearch.sql.data.type.ExprCoreType;
2426
import org.opensearch.sql.executor.ExecutionEngine;
25-
import org.opensearch.sql.prometheus.data.constants.PrometheusFieldConstants;
2627

2728
/**
2829
* Default implementation of QueryRangeFunctionResponseHandle.
@@ -40,63 +41,61 @@ public class DefaultQueryRangeFunctionResponseHandle implements QueryRangeFuncti
4041
*/
4142
public DefaultQueryRangeFunctionResponseHandle(JSONObject responseObject) {
4243
this.responseObject = responseObject;
43-
constructIteratorAndSchema();
44+
constructSchema();
45+
constructIterator();
4446
}
4547

46-
private void constructIteratorAndSchema() {
48+
private void constructIterator() {
4749
List<ExprValue> result = new ArrayList<>();
48-
List<ExecutionEngine.Schema.Column> columnList = new ArrayList<>();
4950
if ("matrix".equals(responseObject.getString("resultType"))) {
5051
JSONArray itemArray = responseObject.getJSONArray("result");
5152
for (int i = 0; i < itemArray.length(); i++) {
53+
LinkedHashMap<String, ExprValue> linkedHashMap = new LinkedHashMap<>();
5254
JSONObject item = itemArray.getJSONObject(i);
53-
JSONObject metric = item.getJSONObject("metric");
54-
JSONArray values = item.getJSONArray("values");
55-
if (i == 0) {
56-
columnList = getColumnList(metric);
57-
}
58-
for (int j = 0; j < values.length(); j++) {
59-
LinkedHashMap<String, ExprValue> linkedHashMap =
60-
extractRow(metric, values.getJSONArray(j), columnList);
61-
result.add(new ExprTupleValue(linkedHashMap));
62-
}
55+
linkedHashMap.put(LABELS, extractLabels(item.getJSONObject("metric")));
56+
extractTimestampAndValues(item.getJSONArray("values"), linkedHashMap);
57+
result.add(new ExprTupleValue(linkedHashMap));
6358
}
6459
} else {
6560
throw new RuntimeException(String.format("Unexpected Result Type: %s during Prometheus "
6661
+ "Response Parsing. 'matrix' resultType is expected",
6762
responseObject.getString("resultType")));
6863
}
69-
this.schema = new ExecutionEngine.Schema(columnList);
7064
this.responseIterator = result.iterator();
7165
}
7266

73-
@NotNull
74-
private static LinkedHashMap<String, ExprValue> extractRow(JSONObject metric,
75-
JSONArray values, List<ExecutionEngine.Schema.Column> columnList) {
76-
LinkedHashMap<String, ExprValue> linkedHashMap = new LinkedHashMap<>();
77-
for (ExecutionEngine.Schema.Column column : columnList) {
78-
if (PrometheusFieldConstants.TIMESTAMP.equals(column.getName())) {
79-
linkedHashMap.put(PrometheusFieldConstants.TIMESTAMP,
80-
new ExprTimestampValue(Instant.ofEpochMilli((long) (values.getDouble(0) * 1000))));
81-
} else if (column.getName().equals(VALUE)) {
82-
linkedHashMap.put(VALUE, new ExprDoubleValue(values.getDouble(1)));
83-
} else {
84-
linkedHashMap.put(column.getName(),
85-
new ExprStringValue(metric.getString(column.getName())));
86-
}
67+
private static void extractTimestampAndValues(JSONArray values,
68+
LinkedHashMap<String, ExprValue> linkedHashMap) {
69+
List<ExprValue> timestampList = new ArrayList<>();
70+
List<ExprValue> valueList = new ArrayList<>();
71+
for (int j = 0; j < values.length(); j++) {
72+
JSONArray value = values.getJSONArray(j);
73+
timestampList.add(new ExprTimestampValue(
74+
Instant.ofEpochMilli((long) (value.getDouble(0) * 1000))));
75+
valueList.add(new ExprDoubleValue(value.getDouble(1)));
8776
}
88-
return linkedHashMap;
77+
linkedHashMap.put(TIMESTAMP,
78+
new ExprCollectionValue(timestampList));
79+
linkedHashMap.put(VALUE, new ExprCollectionValue(valueList));
8980
}
9081

82+
private void constructSchema() {
83+
this.schema = new ExecutionEngine.Schema(getColumnList());
84+
}
9185

92-
private List<ExecutionEngine.Schema.Column> getColumnList(JSONObject metric) {
86+
private ExprValue extractLabels(JSONObject metric) {
87+
LinkedHashMap<String, ExprValue> labelsMap = new LinkedHashMap<>();
88+
metric.keySet().forEach(key
89+
-> labelsMap.put(key, new ExprStringValue(metric.getString(key))));
90+
return new ExprTupleValue(labelsMap);
91+
}
92+
93+
94+
private List<ExecutionEngine.Schema.Column> getColumnList() {
9395
List<ExecutionEngine.Schema.Column> columnList = new ArrayList<>();
94-
columnList.add(new ExecutionEngine.Schema.Column(PrometheusFieldConstants.TIMESTAMP,
95-
PrometheusFieldConstants.TIMESTAMP, ExprCoreType.TIMESTAMP));
96-
columnList.add(new ExecutionEngine.Schema.Column(VALUE, VALUE, ExprCoreType.DOUBLE));
97-
for (String key : metric.keySet()) {
98-
columnList.add(new ExecutionEngine.Schema.Column(key, key, ExprCoreType.STRING));
99-
}
96+
columnList.add(new ExecutionEngine.Schema.Column(TIMESTAMP, TIMESTAMP, ExprCoreType.ARRAY));
97+
columnList.add(new ExecutionEngine.Schema.Column(VALUE, VALUE, ExprCoreType.ARRAY));
98+
columnList.add(new ExecutionEngine.Schema.Column(LABELS, LABELS, ExprCoreType.STRUCT));
10099
return columnList;
101100
}
102101

prometheus/src/main/java/org/opensearch/sql/prometheus/response/PrometheusResponse.java

Lines changed: 2 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,6 @@ public class PrometheusResponse implements Iterable<ExprValue> {
3535

3636
private final PrometheusResponseFieldNames prometheusResponseFieldNames;
3737

38-
private final Boolean isQueryRangeFunctionScan;
39-
4038
/**
4139
* Constructor.
4240
*
@@ -46,11 +44,9 @@ public class PrometheusResponse implements Iterable<ExprValue> {
4644
* and timestamp fieldName.
4745
*/
4846
public PrometheusResponse(JSONObject responseObject,
49-
PrometheusResponseFieldNames prometheusResponseFieldNames,
50-
Boolean isQueryRangeFunctionScan) {
47+
PrometheusResponseFieldNames prometheusResponseFieldNames) {
5148
this.responseObject = responseObject;
5249
this.prometheusResponseFieldNames = prometheusResponseFieldNames;
53-
this.isQueryRangeFunctionScan = isQueryRangeFunctionScan;
5450
}
5551

5652
@NonNull
@@ -70,24 +66,7 @@ public Iterator<ExprValue> iterator() {
7066
new ExprTimestampValue(Instant.ofEpochMilli((long) (val.getDouble(0) * 1000))));
7167
linkedHashMap.put(prometheusResponseFieldNames.getValueFieldName(), getValue(val, 1,
7268
prometheusResponseFieldNames.getValueType()));
73-
// Concept:
74-
// {\"instance\":\"localhost:9090\",\"__name__\":\"up\",\"job\":\"prometheus\"}"
75-
// This is the label string in the prometheus response.
76-
// Q: how do we map this to columns in a table.
77-
// For queries like source = prometheus.metric_name | ....
78-
// we can get the labels list in prior as we know which metric we are working on.
79-
// In case of commands like source = prometheus.query_range('promQL');
80-
// Any arbitrary command can be written and we don't know the labels
81-
// in the prometheus response in prior.
82-
// So for PPL like commands...output structure is @value, @timestamp
83-
// and each label is treated as a separate column where as in case of query_range
84-
// function irrespective of promQL, the output structure is
85-
// @value, @timestamp, @labels [jsonfied string of all the labels for a data point]
86-
if (isQueryRangeFunctionScan) {
87-
linkedHashMap.put(LABELS, new ExprStringValue(metric.toString()));
88-
} else {
89-
insertLabels(linkedHashMap, metric);
90-
}
69+
insertLabels(linkedHashMap, metric);
9170
result.add(new ExprTupleValue(linkedHashMap));
9271
}
9372
}

prometheus/src/main/java/org/opensearch/sql/prometheus/storage/PrometheusMetricScan.java

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -40,10 +40,6 @@ public class PrometheusMetricScan extends TableScanOperator {
4040

4141
private Iterator<ExprValue> iterator;
4242

43-
@Setter
44-
@Getter
45-
private Boolean isQueryRangeFunctionScan = Boolean.FALSE;
46-
4743
@Setter
4844
private PrometheusResponseFieldNames prometheusResponseFieldNames;
4945

@@ -69,8 +65,7 @@ public void open() {
6965
JSONObject responseObject = prometheusClient.queryRange(
7066
request.getPromQl(),
7167
request.getStartTime(), request.getEndTime(), request.getStep());
72-
return new PrometheusResponse(responseObject, prometheusResponseFieldNames,
73-
isQueryRangeFunctionScan).iterator();
68+
return new PrometheusResponse(responseObject, prometheusResponseFieldNames).iterator();
7469
} catch (IOException e) {
7570
LOG.error(e.getMessage());
7671
throw new RuntimeException("Error fetching data from prometheus server. " + e.getMessage());

prometheus/src/main/java/org/opensearch/sql/prometheus/storage/PrometheusMetricTable.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -97,10 +97,6 @@ public Map<String, ExprType> getFieldTypes() {
9797
public PhysicalPlan implement(LogicalPlan plan) {
9898
PrometheusMetricScan metricScan =
9999
new PrometheusMetricScan(prometheusClient);
100-
if (prometheusQueryRequest != null) {
101-
metricScan.setRequest(prometheusQueryRequest);
102-
metricScan.setIsQueryRangeFunctionScan(Boolean.TRUE);
103-
}
104100
return plan.accept(new PrometheusDefaultImplementor(), metricScan);
105101
}
106102

prometheus/src/main/java/org/opensearch/sql/prometheus/storage/implementor/PrometheusDefaultImplementor.java

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -94,14 +94,12 @@ public PhysicalPlan visitIndexAggregation(PrometheusLogicalMetricAgg node,
9494
public PhysicalPlan visitRelation(LogicalRelation node,
9595
PrometheusMetricScan context) {
9696
PrometheusMetricTable prometheusMetricTable = (PrometheusMetricTable) node.getTable();
97-
if (prometheusMetricTable.getMetricName() != null) {
98-
String query = SeriesSelectionQueryBuilder.build(node.getRelationName(), null);
99-
context.getRequest().setPromQl(query);
100-
setTimeRangeParameters(null, context);
101-
context.getRequest()
102-
.setStep(StepParameterResolver.resolve(context.getRequest().getStartTime(),
103-
context.getRequest().getEndTime(), null));
104-
}
97+
String query = SeriesSelectionQueryBuilder.build(node.getRelationName(), null);
98+
context.getRequest().setPromQl(query);
99+
setTimeRangeParameters(null, context);
100+
context.getRequest()
101+
.setStep(StepParameterResolver.resolve(context.getRequest().getStartTime(),
102+
context.getRequest().getEndTime(), null));
105103
return context;
106104
}
107105

prometheus/src/test/java/org/opensearch/sql/prometheus/functions/scan/QueryRangeFunctionTableScanOperatorTest.java

Lines changed: 24 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,15 @@
1515
import static org.opensearch.sql.prometheus.constants.TestConstants.QUERY;
1616
import static org.opensearch.sql.prometheus.constants.TestConstants.STARTTIME;
1717
import static org.opensearch.sql.prometheus.constants.TestConstants.STEP;
18+
import static org.opensearch.sql.prometheus.data.constants.PrometheusFieldConstants.LABELS;
1819
import static org.opensearch.sql.prometheus.data.constants.PrometheusFieldConstants.TIMESTAMP;
1920
import static org.opensearch.sql.prometheus.data.constants.PrometheusFieldConstants.VALUE;
2021
import static org.opensearch.sql.prometheus.utils.TestUtils.getJson;
2122

2223
import java.io.IOException;
2324
import java.time.Instant;
2425
import java.util.ArrayList;
26+
import java.util.Collections;
2527
import java.util.LinkedHashMap;
2628
import lombok.SneakyThrows;
2729
import org.json.JSONObject;
@@ -30,17 +32,19 @@
3032
import org.junit.jupiter.api.extension.ExtendWith;
3133
import org.mockito.Mock;
3234
import org.mockito.junit.jupiter.MockitoExtension;
35+
import org.opensearch.sql.data.model.ExprCollectionValue;
3336
import org.opensearch.sql.data.model.ExprDoubleValue;
3437
import org.opensearch.sql.data.model.ExprStringValue;
3538
import org.opensearch.sql.data.model.ExprTimestampValue;
3639
import org.opensearch.sql.data.model.ExprTupleValue;
40+
import org.opensearch.sql.data.model.ExprValue;
3741
import org.opensearch.sql.data.type.ExprCoreType;
3842
import org.opensearch.sql.executor.ExecutionEngine;
3943
import org.opensearch.sql.prometheus.client.PrometheusClient;
4044
import org.opensearch.sql.prometheus.request.PrometheusQueryRequest;
4145

4246
@ExtendWith(MockitoExtension.class)
43-
public class QueryRangeFunctionTableScanOperatorTest {
47+
class QueryRangeFunctionTableScanOperatorTest {
4448
@Mock
4549
private PrometheusClient prometheusClient;
4650

@@ -61,22 +65,32 @@ void testQueryResponseIterator() {
6165
.thenReturn(new JSONObject(getJson("query_range_result.json")));
6266
queryRangeFunctionTableScanOperator.open();
6367
Assertions.assertTrue(queryRangeFunctionTableScanOperator.hasNext());
64-
ExprTupleValue firstRow = new ExprTupleValue(new LinkedHashMap<>() {{
65-
put(TIMESTAMP, new ExprTimestampValue(Instant.ofEpochMilli(1435781430781L)));
66-
put(VALUE, new ExprDoubleValue(1));
68+
LinkedHashMap<String, ExprValue> labelsMap = new LinkedHashMap<>() {{
6769
put("instance", new ExprStringValue("localhost:9090"));
6870
put("__name__", new ExprStringValue("up"));
6971
put("job", new ExprStringValue("prometheus"));
72+
}};
73+
ExprTupleValue firstRow = new ExprTupleValue(new LinkedHashMap<>() {{
74+
put(LABELS, new ExprTupleValue(labelsMap));
75+
put(TIMESTAMP, new ExprCollectionValue(Collections
76+
.singletonList(new ExprTimestampValue(Instant.ofEpochMilli(1435781430781L)))));
77+
put(VALUE, new ExprCollectionValue(Collections.singletonList(new ExprDoubleValue(1))));
7078
}
7179
});
80+
7281
assertEquals(firstRow, queryRangeFunctionTableScanOperator.next());
7382
Assertions.assertTrue(queryRangeFunctionTableScanOperator.hasNext());
74-
ExprTupleValue secondRow = new ExprTupleValue(new LinkedHashMap<>() {{
75-
put("@timestamp", new ExprTimestampValue(Instant.ofEpochMilli(1435781430781L)));
76-
put("@value", new ExprDoubleValue(0));
83+
84+
LinkedHashMap<String, ExprValue> labelsMap2 = new LinkedHashMap<>() {{
7785
put("instance", new ExprStringValue("localhost:9091"));
7886
put("__name__", new ExprStringValue("up"));
7987
put("job", new ExprStringValue("node"));
88+
}};
89+
ExprTupleValue secondRow = new ExprTupleValue(new LinkedHashMap<>() {{
90+
put(LABELS, new ExprTupleValue(labelsMap2));
91+
put(TIMESTAMP, new ExprCollectionValue(Collections
92+
.singletonList(new ExprTimestampValue(Instant.ofEpochMilli(1435781430781L)))));
93+
put(VALUE, new ExprCollectionValue(Collections.singletonList(new ExprDoubleValue(0))));
8094
}
8195
});
8296
assertEquals(secondRow, queryRangeFunctionTableScanOperator.next());
@@ -120,11 +134,9 @@ void testQuerySchema() {
120134
.thenReturn(new JSONObject(getJson("query_range_result.json")));
121135
queryRangeFunctionTableScanOperator.open();
122136
ArrayList<ExecutionEngine.Schema.Column> columns = new ArrayList<>();
123-
columns.add(new ExecutionEngine.Schema.Column(TIMESTAMP, TIMESTAMP, ExprCoreType.TIMESTAMP));
124-
columns.add(new ExecutionEngine.Schema.Column(VALUE, VALUE, ExprCoreType.DOUBLE));
125-
columns.add(new ExecutionEngine.Schema.Column("instance", "instance", ExprCoreType.STRING));
126-
columns.add(new ExecutionEngine.Schema.Column("__name__", "__name__", ExprCoreType.STRING));
127-
columns.add(new ExecutionEngine.Schema.Column("job", "job", ExprCoreType.STRING));
137+
columns.add(new ExecutionEngine.Schema.Column(TIMESTAMP, TIMESTAMP, ExprCoreType.ARRAY));
138+
columns.add(new ExecutionEngine.Schema.Column(VALUE, VALUE, ExprCoreType.ARRAY));
139+
columns.add(new ExecutionEngine.Schema.Column(LABELS, LABELS, ExprCoreType.STRUCT));
128140
ExecutionEngine.Schema expectedSchema = new ExecutionEngine.Schema(columns);
129141
assertEquals(expectedSchema, queryRangeFunctionTableScanOperator.schema());
130142
}

0 commit comments

Comments
 (0)