Skip to content

Commit e31817d

Browse files
committed
Batch adding series to query limiter to optimize locks
Signed-off-by: 🌲 Harry 🌊 John 🏔 <[email protected]>
1 parent f791f5b commit e31817d

File tree

5 files changed

+106
-35
lines changed

5 files changed

+106
-35
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
* [ENHANCEMENT] Querier: Retry store gateway on different zones when zone awareness is enabled. #5476
4949
* [ENHANCEMENT] DDBKV: Change metric name from dynamodb_kv_read_capacity_total to dynamodb_kv_consumed_capacity_total and include Delete, Put, Batch dimension. #5481
5050
* [ENHANCEMENT] Compactor: allow unregisteronshutdown to be configurable. #5503
51+
* [ENHANCEMENT] Querier: Batch adding series to query limiter to optimize locking. #5505
5152
* [BUGFIX] Ruler: Validate if rule group can be safely converted back to rule group yaml from protobuf message #5265
5253
* [BUGFIX] Querier: Convert gRPC `ResourceExhausted` status code from store gateway to 422 limit error. #5286
5354
* [BUGFIX] Alertmanager: Route web-ui requests to the alertmanager distributor when sharding is enabled. #5293

pkg/distributor/distributor.go

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1093,17 +1093,18 @@ func (d *Distributor) MetricsForLabelMatchers(ctx context.Context, from, through
10931093
if err := queryLimiter.AddDataBytes(resp.Size()); err != nil {
10941094
return nil, validation.LimitError(err.Error())
10951095
}
1096+
s := make([][]cortexpb.LabelAdapter, 0, len(resp.Metric))
10961097
for _, m := range resp.Metric {
1097-
if err := queryLimiter.AddSeries(m.Labels); err != nil {
1098-
return nil, validation.LimitError(err.Error())
1099-
}
1098+
s = append(s, m.Labels)
11001099
m := cortexpb.FromLabelAdaptersToMetric(m.Labels)
11011100
fingerprint := m.Fingerprint()
11021101
mutex.Lock()
11031102
(*metrics)[fingerprint] = m
11041103
mutex.Unlock()
11051104
}
1106-
1105+
if err := queryLimiter.AddSeries(s...); err != nil {
1106+
return nil, validation.LimitError(err.Error())
1107+
}
11071108
return nil, nil
11081109
})
11091110

@@ -1130,19 +1131,18 @@ func (d *Distributor) MetricsForLabelMatchersStream(ctx context.Context, from, t
11301131
} else if err != nil {
11311132
return nil, err
11321133
}
1133-
1134+
s := make([][]cortexpb.LabelAdapter, 0, len(resp.Metric))
11341135
for _, metric := range resp.Metric {
11351136
m := cortexpb.FromLabelAdaptersToMetricWithCopy(metric.Labels)
1136-
1137-
if err := queryLimiter.AddSeries(metric.Labels); err != nil {
1138-
return nil, validation.LimitError(err.Error())
1139-
}
1140-
1137+
s = append(s, metric.Labels)
11411138
fingerprint := m.Fingerprint()
11421139
mutex.Lock()
11431140
(*metrics)[fingerprint] = m
11441141
mutex.Unlock()
11451142
}
1143+
if err := queryLimiter.AddSeries(s...); err != nil {
1144+
return nil, validation.LimitError(err.Error())
1145+
}
11461146
}
11471147

11481148
return nil, nil

pkg/distributor/query.go

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -326,10 +326,17 @@ func (d *Distributor) queryIngesterStream(ctx context.Context, replicationSet ri
326326
return nil, validation.LimitError(chunkLimitErr.Error())
327327
}
328328

329+
s := make([][]cortexpb.LabelAdapter, 0, len(resp.Chunkseries)+len(resp.Timeseries))
329330
for _, series := range resp.Chunkseries {
330-
if limitErr := queryLimiter.AddSeries(series.Labels); limitErr != nil {
331-
return nil, validation.LimitError(limitErr.Error())
332-
}
331+
s = append(s, series.Labels)
332+
}
333+
334+
for _, series := range resp.Timeseries {
335+
s = append(s, series.Labels)
336+
}
337+
338+
if limitErr := queryLimiter.AddSeries(s...); limitErr != nil {
339+
return nil, validation.LimitError(limitErr.Error())
333340
}
334341

335342
if chunkBytesLimitErr := queryLimiter.AddChunkBytes(resp.ChunksSize()); chunkBytesLimitErr != nil {
@@ -340,12 +347,6 @@ func (d *Distributor) queryIngesterStream(ctx context.Context, replicationSet ri
340347
return nil, validation.LimitError(dataBytesLimitErr.Error())
341348
}
342349

343-
for _, series := range resp.Timeseries {
344-
if limitErr := queryLimiter.AddSeries(series.Labels); limitErr != nil {
345-
return nil, validation.LimitError(limitErr.Error())
346-
}
347-
}
348-
349350
result.Chunkseries = append(result.Chunkseries, resp.Chunkseries...)
350351
result.Timeseries = append(result.Timeseries, resp.Timeseries...)
351352
}

pkg/util/limiter/query_limiter.go

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -65,18 +65,23 @@ func QueryLimiterFromContextWithFallback(ctx context.Context) *QueryLimiter {
6565
return ql
6666
}
6767

68-
// AddSeries adds the input series and returns an error if the limit is reached.
69-
func (ql *QueryLimiter) AddSeries(seriesLabels []cortexpb.LabelAdapter) error {
68+
// AddSeriesBatch adds the batch of input series and returns an error if the limit is reached.
69+
func (ql *QueryLimiter) AddSeries(series ...[]cortexpb.LabelAdapter) error {
7070
// If the max series is unlimited just return without managing map
7171
if ql.maxSeriesPerQuery == 0 {
7272
return nil
7373
}
74-
fingerprint := client.FastFingerprint(seriesLabels)
74+
fps := make([]model.Fingerprint, 0, len(series))
75+
for _, s := range series {
76+
fps = append(fps, client.FastFingerprint(s))
77+
}
7578

7679
ql.uniqueSeriesMx.Lock()
7780
defer ql.uniqueSeriesMx.Unlock()
81+
for _, fp := range fps {
82+
ql.uniqueSeries[fp] = struct{}{}
83+
}
7884

79-
ql.uniqueSeries[fingerprint] = struct{}{}
8085
if len(ql.uniqueSeries) > ql.maxSeriesPerQuery {
8186
// Format error with max limit
8287
return fmt.Errorf(ErrMaxSeriesHit, ql.maxSeriesPerQuery)

pkg/util/limiter/query_limiter_test.go

Lines changed: 76 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package limiter
22

33
import (
44
"fmt"
5+
"sync"
56
"testing"
67

78
"github.com/prometheus/prometheus/model/labels"
@@ -87,6 +88,37 @@ func TestQueryLimiter_AddSeriers_ShouldReturnErrorOnLimitExceeded(t *testing.T)
8788
require.Error(t, err)
8889
}
8990

91+
func TestQueryLimiter_AddSeriesBatch_ShouldReturnErrorOnLimitExceeded(t *testing.T) {
92+
const (
93+
metricName = "test_metric"
94+
)
95+
96+
limiter := NewQueryLimiter(10, 0, 0, 0)
97+
series := make([][]cortexpb.LabelAdapter, 0, 10)
98+
99+
for i := 0; i < 10; i++ {
100+
s := []cortexpb.LabelAdapter{
101+
{
102+
Name: labels.MetricName,
103+
Value: fmt.Sprintf("%v_%v", metricName, i),
104+
},
105+
}
106+
series = append(series, s)
107+
}
108+
err := limiter.AddSeries(series...)
109+
require.NoError(t, err)
110+
111+
series1 := []cortexpb.LabelAdapter{
112+
{
113+
Name: labels.MetricName,
114+
Value: metricName + "_11",
115+
},
116+
}
117+
118+
err = limiter.AddSeries(series1)
119+
require.Error(t, err)
120+
}
121+
90122
func TestQueryLimiter_AddChunkBytes(t *testing.T) {
91123
var limiter = NewQueryLimiter(0, 100, 0, 0)
92124

@@ -106,23 +138,55 @@ func TestQueryLimiter_AddDataBytes(t *testing.T) {
106138
}
107139

108140
func BenchmarkQueryLimiter_AddSeries(b *testing.B) {
141+
AddSeriesConcurrentBench(b, 1)
142+
}
143+
144+
func BenchmarkQueryLimiter_AddSeriesBatch(b *testing.B) {
145+
AddSeriesConcurrentBench(b, 128)
146+
}
147+
148+
func AddSeriesConcurrentBench(b *testing.B, batchSize int) {
149+
b.ResetTimer()
109150
const (
110151
metricName = "test_metric"
111152
)
112-
var series []labels.Labels
113-
for i := 0; i < b.N; i++ {
114-
series = append(series,
115-
labels.FromMap(map[string]string{
116-
labels.MetricName: metricName + "_1",
117-
"series1": fmt.Sprint(i),
118-
}))
119-
}
120-
b.ResetTimer()
121153

122154
limiter := NewQueryLimiter(b.N+1, 0, 0, 0)
123-
for _, s := range series {
124-
err := limiter.AddSeries(cortexpb.FromLabelsToLabelAdapters(s))
125-
assert.NoError(b, err)
155+
156+
// Concurrent goroutines trying to add duplicated series
157+
const numWorkers = 100
158+
var wg sync.WaitGroup
159+
160+
worker := func(w int) {
161+
defer wg.Done()
162+
var series []labels.Labels
163+
for i := 0; i < b.N; i++ {
164+
series = append(series,
165+
labels.FromMap(map[string]string{
166+
labels.MetricName: metricName + "_1",
167+
"series1": fmt.Sprint(i),
168+
}))
169+
}
170+
171+
for i := 0; i < len(series); i += batchSize {
172+
s := make([][]cortexpb.LabelAdapter, 0, batchSize)
173+
j := i + batchSize
174+
if j > len(series) {
175+
j = len(series)
176+
}
177+
for k := i; k < j; k++ {
178+
s = append(s, cortexpb.FromLabelsToLabelAdapters(series[k]))
179+
}
180+
181+
err := limiter.AddSeries(s...)
182+
assert.NoError(b, err)
183+
}
184+
}
185+
186+
for w := 1; w <= numWorkers; w++ {
187+
wg.Add(1)
188+
go worker(w)
126189
}
127190

191+
wg.Wait()
128192
}

0 commit comments

Comments
 (0)