Skip to content

Commit 6c41a64

Browse files
authored
Query Frontend: Handle context error before decoding and merging responses (#5499)
* Add context check in MergeResponse Signed-off-by: Justin Jung <[email protected]> * Add context check in DecodeResponse Signed-off-by: Justin Jung <[email protected]> * Fix context passed to MergeResponse from results_cache Signed-off-by: Justin Jung <[email protected]> * Added tests Signed-off-by: Justin Jung <[email protected]> * Add changelog Signed-off-by: Justin Jung <[email protected]> * Fix tests Signed-off-by: Justin Jung <[email protected]> * Lint Signed-off-by: Justin Jung <[email protected]> --------- Signed-off-by: Justin Jung <[email protected]>
1 parent d84394d commit 6c41a64

File tree

6 files changed

+158
-36
lines changed

6 files changed

+158
-36
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,8 @@
6161
* [BUGFIX] Alertmanager: Remove the user id from state replication key metric label value. #5453
6262
* [BUGFIX] Compactor: Avoid cleaner concurrency issues checking global markers before all blocks. #5457
6363
* [BUGFIX] DDBKV: Disallow instance with older timestamp to update instance with newer timestamp. #5480
64+
* [BUGFIX] Query Frontend: Handle context error before decoding and merging responses. #5499
65+
6466
## 1.15.1 2023-04-26
6567

6668
* [CHANGE] Alertmanager: Validating new fields on the PagerDuty AM config. #5290

pkg/querier/tripperware/instantquery/instant_query.go

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,10 @@ func (instantQueryCodec) DecodeResponse(ctx context.Context, r *http.Response, _
158158
log, ctx := spanlogger.New(ctx, "PrometheusInstantQueryResponse") //nolint:ineffassign,staticcheck
159159
defer log.Finish()
160160

161+
if err := ctx.Err(); err != nil {
162+
return nil, err
163+
}
164+
161165
buf, err := tripperware.BodyBuffer(r, log)
162166
if err != nil {
163167
log.Error(err)
@@ -266,7 +270,7 @@ func (instantQueryCodec) MergeResponse(ctx context.Context, req tripperware.Requ
266270
// For now, we only shard queries that returns a vector.
267271
switch promResponses[0].Data.ResultType {
268272
case model.ValVector.String():
269-
v, err := vectorMerge(req, promResponses)
273+
v, err := vectorMerge(ctx, req, promResponses)
270274
if err != nil {
271275
return nil, err
272276
}
@@ -280,12 +284,17 @@ func (instantQueryCodec) MergeResponse(ctx context.Context, req tripperware.Requ
280284
Stats: statsMerge(promResponses),
281285
}
282286
case model.ValMatrix.String():
287+
sampleStreams, err := matrixMerge(ctx, promResponses)
288+
if err != nil {
289+
return nil, err
290+
}
291+
283292
data = PrometheusInstantQueryData{
284293
ResultType: model.ValMatrix.String(),
285294
Result: PrometheusInstantQueryResult{
286295
Result: &PrometheusInstantQueryResult_Matrix{
287296
Matrix: &Matrix{
288-
SampleStreams: matrixMerge(promResponses),
297+
SampleStreams: sampleStreams,
289298
},
290299
},
291300
},
@@ -302,7 +311,7 @@ func (instantQueryCodec) MergeResponse(ctx context.Context, req tripperware.Requ
302311
return res, nil
303312
}
304313

305-
func vectorMerge(req tripperware.Request, resps []*PrometheusInstantQueryResponse) (*Vector, error) {
314+
func vectorMerge(ctx context.Context, req tripperware.Request, resps []*PrometheusInstantQueryResponse) (*Vector, error) {
306315
output := map[string]*Sample{}
307316
metrics := []string{} // Used to preserve the order for topk and bottomk.
308317
sortPlan, err := sortPlanForQuery(req.GetQuery())
@@ -311,6 +320,9 @@ func vectorMerge(req tripperware.Request, resps []*PrometheusInstantQueryRespons
311320
}
312321
buf := make([]byte, 0, 1024)
313322
for _, resp := range resps {
323+
if err = ctx.Err(); err != nil {
324+
return nil, err
325+
}
314326
if resp == nil {
315327
continue
316328
}
@@ -439,9 +451,12 @@ func sortPlanForQuery(q string) (sortPlan, error) {
439451
return sortByLabels, nil
440452
}
441453

442-
func matrixMerge(resps []*PrometheusInstantQueryResponse) []tripperware.SampleStream {
454+
func matrixMerge(ctx context.Context, resps []*PrometheusInstantQueryResponse) ([]tripperware.SampleStream, error) {
443455
output := make(map[string]tripperware.SampleStream)
444456
for _, resp := range resps {
457+
if err := ctx.Err(); err != nil {
458+
return nil, err
459+
}
445460
if resp == nil {
446461
continue
447462
}
@@ -462,7 +477,7 @@ func matrixMerge(resps []*PrometheusInstantQueryResponse) []tripperware.SampleSt
462477
result = append(result, output[key])
463478
}
464479

465-
return result
480+
return result, nil
466481
}
467482

468483
// NewEmptyPrometheusInstantQueryResponse returns an empty successful Prometheus query range response.

pkg/querier/tripperware/instantquery/instant_query_test.go

Lines changed: 50 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -218,11 +218,14 @@ func TestMergeResponse(t *testing.T) {
218218
Query: "sum(up)",
219219
}
220220
for _, tc := range []struct {
221-
name string
222-
req tripperware.Request
223-
resps []string
224-
expectedResp string
225-
expectedErr error
221+
name string
222+
req tripperware.Request
223+
resps []string
224+
expectedResp string
225+
expectedErr error
226+
cancelBeforeDecode bool
227+
expectedDecodeErr error
228+
cancelBeforeMerge bool
226229
}{
227230
{
228231
name: "empty response",
@@ -355,10 +358,31 @@ func TestMergeResponse(t *testing.T) {
355358
},
356359
expectedResp: `{"status":"success","data":{"resultType":"matrix","result":[{"metric":{"__name__":"bar"},"values":[[1,"1"],[2,"2"],[3,"3"]]}]}}`,
357360
},
361+
{
362+
name: "context cancelled before decoding response",
363+
req: defaultReq,
364+
resps: []string{
365+
`{"status":"success","data":{"resultType":"vector","result":[{"metric":{"__name__":"up","job":"foo"},"value":[1,"1"]}]}}`,
366+
`{"status":"success","data":{"resultType":"vector","result":[{"metric":{"__name__":"up","job":"bar"},"value":[2,"2"]}]}}`,
367+
},
368+
expectedDecodeErr: context.Canceled,
369+
cancelBeforeDecode: true,
370+
},
371+
{
372+
name: "context cancelled before merging response",
373+
req: defaultReq,
374+
resps: []string{
375+
`{"status":"success","data":{"resultType":"vector","result":[{"metric":{"__name__":"up","job":"foo"},"value":[1,"1"]}]}}`,
376+
`{"status":"success","data":{"resultType":"vector","result":[{"metric":{"__name__":"up","job":"bar"},"value":[2,"2"]}]}}`,
377+
},
378+
expectedErr: context.Canceled,
379+
cancelBeforeMerge: true,
380+
},
358381
} {
359382
tc := tc
360383
t.Run(tc.name, func(t *testing.T) {
361384
t.Parallel()
385+
ctx, cancelCtx := context.WithCancel(context.Background())
362386

363387
var resps []tripperware.Response
364388
for _, r := range tc.resps {
@@ -367,20 +391,34 @@ func TestMergeResponse(t *testing.T) {
367391
Header: http.Header{"Content-Type": []string{"application/json"}},
368392
Body: io.NopCloser(bytes.NewBuffer([]byte(r))),
369393
}
370-
dr, err := InstantQueryCodec.DecodeResponse(context.Background(), hr, nil)
371-
require.NoError(t, err)
394+
395+
if tc.cancelBeforeDecode {
396+
cancelCtx()
397+
}
398+
dr, err := InstantQueryCodec.DecodeResponse(ctx, hr, nil)
399+
assert.Equal(t, tc.expectedDecodeErr, err)
400+
if err != nil {
401+
cancelCtx()
402+
return
403+
}
372404
resps = append(resps, dr)
373405
}
374-
resp, err := InstantQueryCodec.MergeResponse(context.Background(), tc.req, resps...)
375-
assert.Equal(t, err, tc.expectedErr)
406+
407+
if tc.cancelBeforeMerge {
408+
cancelCtx()
409+
}
410+
resp, err := InstantQueryCodec.MergeResponse(ctx, tc.req, resps...)
411+
assert.Equal(t, tc.expectedErr, err)
376412
if err != nil {
413+
cancelCtx()
377414
return
378415
}
379-
dr, err := InstantQueryCodec.EncodeResponse(context.Background(), resp)
380-
assert.Equal(t, err, tc.expectedErr)
416+
dr, err := InstantQueryCodec.EncodeResponse(ctx, resp)
417+
assert.Equal(t, tc.expectedErr, err)
381418
contents, err := io.ReadAll(dr.Body)
382-
assert.Equal(t, err, tc.expectedErr)
419+
assert.Equal(t, tc.expectedErr, err)
383420
assert.Equal(t, string(contents), tc.expectedResp)
421+
cancelCtx()
384422
})
385423
}
386424
}

pkg/querier/tripperware/queryrange/query_range.go

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -145,12 +145,16 @@ func (c prometheusCodec) MergeResponse(ctx context.Context, _ tripperware.Reques
145145

146146
// Merge the responses.
147147
sort.Sort(byFirstTime(promResponses))
148+
sampleStreams, err := matrixMerge(ctx, promResponses)
149+
if err != nil {
150+
return nil, err
151+
}
148152

149153
response := PrometheusResponse{
150154
Status: StatusSuccess,
151155
Data: PrometheusData{
152156
ResultType: model.ValMatrix.String(),
153-
Result: matrixMerge(promResponses),
157+
Result: sampleStreams,
154158
Stats: statsMerge(c.sharded, promResponses),
155159
},
156160
}
@@ -263,6 +267,10 @@ func (prometheusCodec) DecodeResponse(ctx context.Context, r *http.Response, _ t
263267
log, ctx := spanlogger.New(ctx, "ParseQueryRangeResponse") //nolint:ineffassign,staticcheck
264268
defer log.Finish()
265269

270+
if err := ctx.Err(); err != nil {
271+
return nil, err
272+
}
273+
266274
buf, err := tripperware.BodyBuffer(r, log)
267275
if err != nil {
268276
log.Error(err)
@@ -347,9 +355,12 @@ func statsMerge(shouldSumStats bool, resps []*PrometheusResponse) *tripperware.P
347355
return tripperware.StatsMerge(output)
348356
}
349357

350-
func matrixMerge(resps []*PrometheusResponse) []tripperware.SampleStream {
358+
func matrixMerge(ctx context.Context, resps []*PrometheusResponse) ([]tripperware.SampleStream, error) {
351359
output := make(map[string]tripperware.SampleStream)
352360
for _, resp := range resps {
361+
if err := ctx.Err(); err != nil {
362+
return nil, err
363+
}
353364
if resp == nil {
354365
continue
355366
}
@@ -367,7 +378,7 @@ func matrixMerge(resps []*PrometheusResponse) []tripperware.SampleStream {
367378
result = append(result, output[key])
368379
}
369380

370-
return result
381+
return result, nil
371382
}
372383

373384
func parseDurationMs(s string) (int64, error) {

0 commit comments

Comments
 (0)