Skip to content

Commit d974032

Browse files
committed
Reusing the grpc client to peform healthcheck
Signed-off-by: alanprot <[email protected]>
1 parent c6347f0 commit d974032

File tree

2 files changed

+74
-32
lines changed

2 files changed

+74
-32
lines changed

pkg/util/grpcclient/health_check.go

Lines changed: 70 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ package grpcclient
22

33
import (
44
"context"
5-
"errors"
65
"flag"
76
"fmt"
87
"io"
@@ -11,41 +10,49 @@ import (
1110

1211
"github.com/go-kit/log"
1312
"github.com/go-kit/log/level"
13+
"github.com/gogo/status"
1414
"github.com/weaveworks/common/user"
1515
"go.uber.org/atomic"
1616
"google.golang.org/grpc"
17+
"google.golang.org/grpc/codes"
1718
"google.golang.org/grpc/health/grpc_health_v1"
1819

1920
"github.com/cortexproject/cortex/pkg/util/services"
2021
)
2122

2223
var (
23-
unhealthyErr = errors.New("instance marked as unhealthy")
24+
unhealthyErr = status.Error(codes.Unavailable, "instance marked as unhealthy")
2425
)
2526

2627
type HealthCheckConfig struct {
2728
*HealthCheckInterceptors `yaml:"-"`
2829

29-
UnhealthyThreshold int `yaml:"unhealthy_threshold"`
30+
UnhealthyThreshold int64 `yaml:"unhealthy_threshold"`
3031
Interval time.Duration `yaml:"interval"`
3132
Timeout time.Duration `yaml:"timeout"`
3233
}
3334

3435
// RegisterFlagsWithPrefix for Config.
3536
func (cfg *HealthCheckConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
36-
f.IntVar(&cfg.UnhealthyThreshold, prefix+".unhealthy-threshold", 0, "The number of consecutive failed health checks required before considering a target unhealthy. 0 means disabled.")
37+
f.Int64Var(&cfg.UnhealthyThreshold, prefix+".unhealthy-threshold", 0, "The number of consecutive failed health checks required before considering a target unhealthy. 0 means disabled.")
3738
f.DurationVar(&cfg.Timeout, prefix+".timeout", 1*time.Second, "The amount of time during which no response from a target means a failed health check.")
3839
f.DurationVar(&cfg.Interval, prefix+".interval", 5*time.Second, "The approximate amount of time between health checks of an individual target.")
3940
}
4041

41-
type healthCheckEntry struct {
42-
address string
43-
clientConfig *ConfigWithHealthCheck
42+
type healthCheckClient struct {
43+
grpc_health_v1.HealthClient
44+
io.Closer
45+
}
4446

45-
sync.RWMutex
46-
unhealthyCount int
47+
type healthCheckEntry struct {
48+
address string
49+
clientConfig *ConfigWithHealthCheck
4750
lastCheckTime atomic.Time
4851
lastTickTime atomic.Time
52+
unhealthyCount atomic.Int64
53+
54+
healthCheckClientMutex sync.RWMutex
55+
healthCheckClient *healthCheckClient
4956
}
5057

5158
type HealthCheckInterceptors struct {
@@ -75,18 +82,14 @@ func NewHealthCheckInterceptors(logger log.Logger) *HealthCheckInterceptors {
7582
}
7683

7784
func (e *healthCheckEntry) isHealthy() bool {
78-
e.RLock()
79-
defer e.RUnlock()
80-
return e.unhealthyCount < e.clientConfig.HealthCheckConfig.UnhealthyThreshold
85+
return e.unhealthyCount.Load() < e.clientConfig.HealthCheckConfig.UnhealthyThreshold
8186
}
8287

8388
func (e *healthCheckEntry) recordHealth(err error) error {
84-
e.Lock()
85-
defer e.Unlock()
8689
if err != nil {
87-
e.unhealthyCount++
90+
e.unhealthyCount.Inc()
8891
} else {
89-
e.unhealthyCount = 0
92+
e.unhealthyCount.Store(0)
9093
}
9194

9295
return err
@@ -96,6 +99,51 @@ func (e *healthCheckEntry) tick() {
9699
e.lastTickTime.Store(time.Now())
97100
}
98101

102+
func (e *healthCheckEntry) close() error {
103+
e.healthCheckClientMutex.Lock()
104+
defer e.healthCheckClientMutex.Unlock()
105+
106+
if e.healthCheckClient != nil {
107+
err := e.healthCheckClient.Close()
108+
e.healthCheckClient = nil
109+
return err
110+
}
111+
112+
return nil
113+
}
114+
115+
func (e *healthCheckEntry) getClient(factory func(cc *grpc.ClientConn) (grpc_health_v1.HealthClient, io.Closer)) (*healthCheckClient, error) {
116+
e.healthCheckClientMutex.RLock()
117+
c := e.healthCheckClient
118+
e.healthCheckClientMutex.RUnlock()
119+
120+
if c != nil {
121+
return c, nil
122+
}
123+
124+
e.healthCheckClientMutex.Lock()
125+
defer e.healthCheckClientMutex.Unlock()
126+
127+
if e.healthCheckClient == nil {
128+
dialOpts, err := e.clientConfig.Config.DialOption(nil, nil)
129+
if err != nil {
130+
return nil, err
131+
}
132+
conn, err := grpc.NewClient(e.address, dialOpts...)
133+
if err != nil {
134+
return nil, err
135+
}
136+
137+
client, closer := factory(conn)
138+
e.healthCheckClient = &healthCheckClient{
139+
HealthClient: client,
140+
Closer: closer,
141+
}
142+
}
143+
144+
return e.healthCheckClient, nil
145+
}
146+
99147
func (h *HealthCheckInterceptors) registeredInstances() []*healthCheckEntry {
100148
h.RLock()
101149
defer h.RUnlock()
@@ -112,6 +160,9 @@ func (h *HealthCheckInterceptors) iteration(ctx context.Context) error {
112160
for _, instance := range h.registeredInstances() {
113161
if time.Since(instance.lastTickTime.Load()) >= h.instanceGcTimeout {
114162
h.Lock()
163+
if err := instance.close(); err != nil {
164+
level.Warn(h.logger).Log("msg", "Error closing health check", "err", err)
165+
}
115166
delete(h.activeInstances, instance.address)
116167
h.Unlock()
117168
continue
@@ -124,25 +175,13 @@ func (h *HealthCheckInterceptors) iteration(ctx context.Context) error {
124175
instance.lastCheckTime.Store(time.Now())
125176

126177
go func(i *healthCheckEntry) {
127-
dialOpts, err := i.clientConfig.Config.DialOption(nil, nil)
128-
if err != nil {
129-
level.Error(h.logger).Log("msg", "error creating dialOpts to perform healthcheck", "address", i.address, "err", err)
130-
return
131-
}
132-
conn, err := grpc.NewClient(i.address, dialOpts...)
178+
client, err := i.getClient(h.healthClientFactory)
179+
133180
if err != nil {
134-
level.Error(h.logger).Log("msg", "error creating client to perform healthcheck", "address", i.address, "err", err)
181+
level.Error(h.logger).Log("msg", "error creating healthcheck client to perform healthcheck", "address", i.address, "err", err)
135182
return
136183
}
137184

138-
client, closer := h.healthClientFactory(conn)
139-
140-
defer func() {
141-
if err := closer.Close(); err != nil {
142-
level.Warn(h.logger).Log("msg", "error closing connection", "address", i.address, "err", err)
143-
}
144-
}()
145-
146185
if err := i.recordHealth(healthCheck(client, i.clientConfig.HealthCheckConfig.Timeout)); !i.isHealthy() {
147186
level.Warn(h.logger).Log("msg", "instance marked as unhealthy", "address", i.address, "err", err)
148187
}

pkg/util/grpcclient/health_check_test.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,10 @@ import (
1111
"github.com/stretchr/testify/require"
1212
"go.uber.org/atomic"
1313
"google.golang.org/grpc"
14+
"google.golang.org/grpc/codes"
1415
"google.golang.org/grpc/credentials/insecure"
1516
"google.golang.org/grpc/health/grpc_health_v1"
17+
"google.golang.org/grpc/status"
1618

1719
utillog "github.com/cortexproject/cortex/pkg/util/log"
1820
"github.com/cortexproject/cortex/pkg/util/services"
@@ -136,7 +138,8 @@ func TestNewHealthCheckInterceptors(t *testing.T) {
136138
require.False(t, hMock.open.Load())
137139

138140
cortex_testutil.Poll(t, time.Second, true, func() interface{} {
139-
return errors.Is(ui(context.Background(), "", struct{}{}, struct{}{}, ccUnhealthy, invoker), unhealthyErr)
141+
err := ui(context.Background(), "", struct{}{}, struct{}{}, ccUnhealthy, invoker)
142+
return errors.Is(err, unhealthyErr) || status.Code(err) == codes.Unavailable
140143
})
141144

142145
// Other instances should remain healthy

0 commit comments

Comments
 (0)