Skip to content

Commit bb63874

Browse files
committed
metric + test
Signed-off-by: alanprot <[email protected]>
1 parent e92e242 commit bb63874

File tree

3 files changed

+107
-3
lines changed

3 files changed

+107
-3
lines changed

pkg/distributor/distributor.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -374,10 +374,11 @@ func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Ove
374374
}, []string{"user"}),
375375

376376
validateMetrics: validation.NewValidateMetrics(reg),
377+
asyncExecutor: util.NewNoOpExecutor(),
377378
}
378379

379380
if cfg.NumPushWorkers > 0 {
380-
d.asyncExecutor = util.NewWorkerPool(cfg.NumPushWorkers)
381+
d.asyncExecutor = util.NewWorkerPool("distributor", cfg.NumPushWorkers, reg)
381382
}
382383

383384
promauto.With(reg).NewGauge(prometheus.GaugeOpts{

pkg/util/worker_pool.go

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,11 @@
11
package util
22

3-
import "sync"
3+
import (
4+
"github.com/prometheus/client_golang/prometheus/promauto"
5+
"sync"
6+
7+
"github.com/prometheus/client_golang/prometheus"
8+
)
49

510
// This code was based on: https://github.com/grpc/grpc-go/blob/66ba4b264d26808cb7af3c86eee66e843472915e/server.go
611

@@ -13,10 +18,13 @@ const serverWorkerResetThreshold = 1 << 16
1318

1419
type AsyncExecutor interface {
1520
Submit(f func())
21+
Stop()
1622
}
1723

1824
type noOpExecutor struct{}
1925

26+
func (n noOpExecutor) Stop() {}
27+
2028
func NewNoOpExecutor() AsyncExecutor {
2129
return &noOpExecutor{}
2230
}
@@ -28,11 +36,19 @@ func (n noOpExecutor) Submit(f func()) {
2836
type workerPoolExecutor struct {
2937
serverWorkerChannel chan func()
3038
closeOnce sync.Once
39+
40+
fallbackTotal prometheus.Counter
3141
}
3242

33-
func NewWorkerPool(numWorkers int) AsyncExecutor {
43+
func NewWorkerPool(name string, numWorkers int, reg prometheus.Registerer) AsyncExecutor {
3444
wp := &workerPoolExecutor{
3545
serverWorkerChannel: make(chan func()),
46+
fallbackTotal: promauto.With(reg).NewCounter(prometheus.CounterOpts{
47+
Namespace: "cortex",
48+
Name: "worker_pool_fallback_total",
49+
Help: "The total number additional go routines that needed to be created to run jobs.",
50+
ConstLabels: prometheus.Labels{"name": name},
51+
}),
3652
}
3753

3854
for i := 0; i < numWorkers; i++ {
@@ -52,6 +68,7 @@ func (s *workerPoolExecutor) Submit(f func()) {
5268
select {
5369
case s.serverWorkerChannel <- f:
5470
default:
71+
s.fallbackTotal.Inc()
5572
go f()
5673
}
5774
}

pkg/util/worker_pool_test.go

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
package util
2+
3+
import (
4+
"bytes"
5+
"github.com/prometheus/client_golang/prometheus"
6+
"github.com/prometheus/client_golang/prometheus/testutil"
7+
"github.com/stretchr/testify/require"
8+
"sync"
9+
"testing"
10+
)
11+
12+
func TestNewWorkerPool_CreateMultiplesPoolsWithSameRegistry(t *testing.T) {
13+
reg := prometheus.NewPedanticRegistry()
14+
wp1 := NewWorkerPool("test1", 100, reg)
15+
defer wp1.Stop()
16+
wp2 := NewWorkerPool("test2", 100, reg)
17+
defer wp2.Stop()
18+
}
19+
20+
func TestWorkerPool_TestMetric(t *testing.T) {
21+
reg := prometheus.NewPedanticRegistry()
22+
workerPool := NewWorkerPool("test1", 1, reg)
23+
defer workerPool.Stop()
24+
25+
require.NoError(t, testutil.GatherAndCompare(reg, bytes.NewBufferString(`
26+
# HELP cortex_worker_pool_fallback_total The total number additional go routines that needed to be created to run jobs.
27+
# TYPE cortex_worker_pool_fallback_total counter
28+
cortex_worker_pool_fallback_total{name="test1"} 0
29+
`), "cortex_worker_pool_fallback_total"))
30+
31+
wg := &sync.WaitGroup{}
32+
wg.Add(1)
33+
34+
// Block the first job
35+
workerPool.Submit(func() {
36+
wg.Wait()
37+
})
38+
39+
// create an extra job to increment the metric
40+
workerPool.Submit(func() {})
41+
require.NoError(t, testutil.GatherAndCompare(reg, bytes.NewBufferString(`
42+
# HELP cortex_worker_pool_fallback_total The total number additional go routines that needed to be created to run jobs.
43+
# TYPE cortex_worker_pool_fallback_total counter
44+
cortex_worker_pool_fallback_total{name="test1"} 1
45+
`), "cortex_worker_pool_fallback_total"))
46+
47+
wg.Done()
48+
}
49+
50+
func TestWorkerPool_ShouldFallbackWhenAllWorkersAreBusy(t *testing.T) {
51+
reg := prometheus.NewPedanticRegistry()
52+
numberOfWorkers := 10
53+
workerPool := NewWorkerPool("test1", numberOfWorkers, reg)
54+
defer workerPool.Stop()
55+
56+
m := sync.Mutex{}
57+
blockerWg := sync.WaitGroup{}
58+
blockerWg.Add(numberOfWorkers)
59+
60+
// Lets lock all submited jobs
61+
m.Lock()
62+
63+
for i := 0; i < numberOfWorkers; i++ {
64+
workerPool.Submit(func() {
65+
defer blockerWg.Done()
66+
m.Lock()
67+
m.Unlock()
68+
})
69+
}
70+
71+
// At this point all workers should be busy. lets try to create a new job
72+
wg := sync.WaitGroup{}
73+
wg.Add(1)
74+
workerPool.Submit(func() {
75+
defer wg.Done()
76+
})
77+
78+
// Make sure the last job ran to the end
79+
wg.Wait()
80+
81+
// Lets release the jobs
82+
m.Unlock()
83+
84+
blockerWg.Wait()
85+
86+
}

0 commit comments

Comments
 (0)