Skip to content

Commit 4bbfa0c

Browse files
committed
metric + test
Signed-off-by: alanprot <[email protected]>
1 parent 45db5fd commit 4bbfa0c

File tree

4 files changed

+109
-4
lines changed

4 files changed

+109
-4
lines changed

pkg/distributor/distributor.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -374,10 +374,11 @@ func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Ove
374374
}, []string{"user"}),
375375

376376
validateMetrics: validation.NewValidateMetrics(reg),
377+
asyncExecutor: util.NewNoOpExecutor(),
377378
}
378379

379380
if cfg.NumPushWorkers > 0 {
380-
d.asyncExecutor = util.NewWorkerPool(cfg.NumPushWorkers)
381+
d.asyncExecutor = util.NewWorkerPool("distributor", cfg.NumPushWorkers, reg)
381382
}
382383

383384
promauto.With(reg).NewGauge(prometheus.GaugeOpts{

pkg/ring/ring_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ func benchmarkBatch(b *testing.B, g TokenGenerator, numInstances, numKeys int) {
8181
exe: noOpExecutor,
8282
},
8383
"workerExecutor": {
84-
exe: util.NewWorkerPool(100),
84+
exe: util.NewWorkerPool("test", 100, prometheus.NewPedanticRegistry()),
8585
},
8686
}
8787

pkg/util/worker_pool.go

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,11 @@
11
package util
22

3-
import "sync"
3+
import (
4+
"sync"
5+
6+
"github.com/prometheus/client_golang/prometheus"
7+
"github.com/prometheus/client_golang/prometheus/promauto"
8+
)
49

510
// This code was based on: https://github.com/grpc/grpc-go/blob/66ba4b264d26808cb7af3c86eee66e843472915e/server.go
611

@@ -13,10 +18,13 @@ const serverWorkerResetThreshold = 1 << 16
1318

1419
type AsyncExecutor interface {
1520
Submit(f func())
21+
Stop()
1622
}
1723

1824
type noOpExecutor struct{}
1925

26+
func (n noOpExecutor) Stop() {}
27+
2028
func NewNoOpExecutor() AsyncExecutor {
2129
return &noOpExecutor{}
2230
}
@@ -28,11 +36,19 @@ func (n noOpExecutor) Submit(f func()) {
2836
type workerPoolExecutor struct {
2937
serverWorkerChannel chan func()
3038
closeOnce sync.Once
39+
40+
fallbackTotal prometheus.Counter
3141
}
3242

33-
func NewWorkerPool(numWorkers int) AsyncExecutor {
43+
func NewWorkerPool(name string, numWorkers int, reg prometheus.Registerer) AsyncExecutor {
3444
wp := &workerPoolExecutor{
3545
serverWorkerChannel: make(chan func()),
46+
fallbackTotal: promauto.With(reg).NewCounter(prometheus.CounterOpts{
47+
Namespace: "cortex",
48+
Name: "worker_pool_fallback_total",
49+
Help: "The total number additional go routines that needed to be created to run jobs.",
50+
ConstLabels: prometheus.Labels{"name": name},
51+
}),
3652
}
3753

3854
for i := 0; i < numWorkers; i++ {
@@ -52,6 +68,7 @@ func (s *workerPoolExecutor) Submit(f func()) {
5268
select {
5369
case s.serverWorkerChannel <- f:
5470
default:
71+
s.fallbackTotal.Inc()
5572
go f()
5673
}
5774
}

pkg/util/worker_pool_test.go

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
package util
2+
3+
import (
4+
"bytes"
5+
"sync"
6+
"testing"
7+
8+
"github.com/prometheus/client_golang/prometheus"
9+
"github.com/prometheus/client_golang/prometheus/testutil"
10+
"github.com/stretchr/testify/require"
11+
)
12+
13+
func TestNewWorkerPool_CreateMultiplesPoolsWithSameRegistry(t *testing.T) {
14+
reg := prometheus.NewPedanticRegistry()
15+
wp1 := NewWorkerPool("test1", 100, reg)
16+
defer wp1.Stop()
17+
wp2 := NewWorkerPool("test2", 100, reg)
18+
defer wp2.Stop()
19+
}
20+
21+
func TestWorkerPool_TestMetric(t *testing.T) {
22+
reg := prometheus.NewPedanticRegistry()
23+
workerPool := NewWorkerPool("test1", 1, reg)
24+
defer workerPool.Stop()
25+
26+
require.NoError(t, testutil.GatherAndCompare(reg, bytes.NewBufferString(`
27+
# HELP cortex_worker_pool_fallback_total The total number additional go routines that needed to be created to run jobs.
28+
# TYPE cortex_worker_pool_fallback_total counter
29+
cortex_worker_pool_fallback_total{name="test1"} 0
30+
`), "cortex_worker_pool_fallback_total"))
31+
32+
wg := &sync.WaitGroup{}
33+
wg.Add(1)
34+
35+
// Block the first job
36+
workerPool.Submit(func() {
37+
wg.Wait()
38+
})
39+
40+
// create an extra job to increment the metric
41+
workerPool.Submit(func() {})
42+
require.NoError(t, testutil.GatherAndCompare(reg, bytes.NewBufferString(`
43+
# HELP cortex_worker_pool_fallback_total The total number additional go routines that needed to be created to run jobs.
44+
# TYPE cortex_worker_pool_fallback_total counter
45+
cortex_worker_pool_fallback_total{name="test1"} 1
46+
`), "cortex_worker_pool_fallback_total"))
47+
48+
wg.Done()
49+
}
50+
51+
func TestWorkerPool_ShouldFallbackWhenAllWorkersAreBusy(t *testing.T) {
52+
reg := prometheus.NewPedanticRegistry()
53+
numberOfWorkers := 10
54+
workerPool := NewWorkerPool("test1", numberOfWorkers, reg)
55+
defer workerPool.Stop()
56+
57+
m := sync.Mutex{}
58+
blockerWg := sync.WaitGroup{}
59+
blockerWg.Add(numberOfWorkers)
60+
61+
// Lets lock all submited jobs
62+
m.Lock()
63+
64+
for i := 0; i < numberOfWorkers; i++ {
65+
workerPool.Submit(func() {
66+
defer blockerWg.Done()
67+
m.Lock()
68+
m.Unlock() //nolint:staticcheck
69+
})
70+
}
71+
72+
// At this point all workers should be busy. lets try to create a new job
73+
wg := sync.WaitGroup{}
74+
wg.Add(1)
75+
workerPool.Submit(func() {
76+
defer wg.Done()
77+
})
78+
79+
// Make sure the last job ran to the end
80+
wg.Wait()
81+
82+
// Lets release the jobs
83+
m.Unlock()
84+
85+
blockerWg.Wait()
86+
87+
}

0 commit comments

Comments
 (0)