Skip to content

Commit dc49de8

Browse files
authored
balancer: add V2Picker, ClientConn.UpdateState, SubConnState.ConnectionError (#3186)
Also implement V2 versions of base.*, xds, pickfirst, grpclb, and round robin balancers.
1 parent 7c1d326 commit dc49de8

34 files changed

+701
-560
lines changed

balancer/balancer.go

Lines changed: 98 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,15 @@ type NewSubConnOptions struct {
117117
HealthCheckEnabled bool
118118
}
119119

120+
// State contains the balancer's state relevant to the gRPC ClientConn.
121+
type State struct {
122+
// State contains the connectivity state of the balancer, which is used to
123+
// determine the state of the ClientConn.
124+
ConnectivityState connectivity.State
125+
// Picker is used to choose connections (SubConns) for RPCs.
126+
Picker V2Picker
127+
}
128+
120129
// ClientConn represents a gRPC ClientConn.
121130
//
122131
// This interface is to be implemented by gRPC. Users should not need a
@@ -137,8 +146,17 @@ type ClientConn interface {
137146
//
138147
// gRPC will update the connectivity state of the ClientConn, and will call pick
139148
// on the new picker to pick new SubConn.
149+
//
150+
// Deprecated: use UpdateState instead
140151
UpdateBalancerState(s connectivity.State, p Picker)
141152

153+
// UpdateState notifies gRPC that the balancer's internal state has
154+
// changed.
155+
//
156+
// gRPC will update the connectivity state of the ClientConn, and will call pick
157+
// on the new picker to pick new SubConns.
158+
UpdateState(State)
159+
142160
// ResolveNow is called by balancer to notify gRPC to do a name resolving.
143161
ResolveNow(resolver.ResolveNowOptions)
144162

@@ -185,11 +203,19 @@ type ConfigParser interface {
185203
ParseConfig(LoadBalancingConfigJSON json.RawMessage) (serviceconfig.LoadBalancingConfig, error)
186204
}
187205

188-
// PickOptions contains addition information for the Pick operation.
189-
type PickOptions struct {
206+
// PickOptions is a type alias of PickInfo for legacy reasons.
207+
//
208+
// Deprecated: use PickInfo instead.
209+
type PickOptions = PickInfo
210+
211+
// PickInfo contains additional information for the Pick operation.
212+
type PickInfo struct {
190213
// FullMethodName is the method name that NewClientStream() is called
191214
// with. The canonical format is /service/Method.
192215
FullMethodName string
216+
// Ctx is the RPC's context, and may contain relevant RPC-level information
217+
// like the outgoing header metadata.
218+
Ctx context.Context
193219
}
194220

195221
// DoneInfo contains additional information for done.
@@ -215,14 +241,16 @@ var (
215241
ErrNoSubConnAvailable = errors.New("no SubConn is available")
216242
// ErrTransientFailure indicates all SubConns are in TransientFailure.
217243
// WaitForReady RPCs will block, non-WaitForReady RPCs will fail.
218-
ErrTransientFailure = errors.New("all SubConns are in TransientFailure")
244+
ErrTransientFailure = TransientFailureError(errors.New("all SubConns are in TransientFailure"))
219245
)
220246

221247
// Picker is used by gRPC to pick a SubConn to send an RPC.
222248
// Balancer is expected to generate a new picker from its snapshot every time its
223249
// internal state has changed.
224250
//
225251
// The pickers used by gRPC can be updated by ClientConn.UpdateBalancerState().
252+
//
253+
// Deprecated: use V2Picker instead
226254
type Picker interface {
227255
// Pick returns the SubConn to be used to send the RPC.
228256
// The returned SubConn must be one returned by NewSubConn().
@@ -243,18 +271,76 @@ type Picker interface {
243271
//
244272
// If the returned error is not nil:
245273
// - If the error is ErrNoSubConnAvailable, gRPC will block until UpdateBalancerState()
246-
// - If the error is ErrTransientFailure:
274+
// - If the error is ErrTransientFailure or implements IsTransientFailure()
275+
// bool, returning true:
247276
// - If the RPC is wait-for-ready, gRPC will block until UpdateBalancerState()
248277
// is called to pick again;
249278
// - Otherwise, RPC will fail with unavailable error.
250279
// - Else (error is other non-nil error):
251-
// - The RPC will fail with unavailable error.
280+
// - The RPC will fail with the error's status code, or Unknown if it is
281+
// not a status error.
252282
//
253283
// The returned done() function will be called once the rpc has finished,
254284
// with the final status of that RPC. If the SubConn returned is not a
255285
// valid SubConn type, done may not be called. done may be nil if balancer
256286
// doesn't care about the RPC status.
257-
Pick(ctx context.Context, opts PickOptions) (conn SubConn, done func(DoneInfo), err error)
287+
Pick(ctx context.Context, info PickInfo) (conn SubConn, done func(DoneInfo), err error)
288+
}
289+
290+
// PickResult contains information related to a connection chosen for an RPC.
291+
type PickResult struct {
292+
// SubConn is the connection to use for this pick, if its state is Ready.
293+
// If the state is not Ready, gRPC will block the RPC until a new Picker is
294+
// provided by the balancer (using ClientConn.UpdateState). The SubConn
295+
// must be one returned by ClientConn.NewSubConn.
296+
SubConn SubConn
297+
298+
// Done is called when the RPC is completed. If the SubConn is not ready,
299+
// this will be called with a nil parameter. If the SubConn is not a valid
300+
// type, Done may not be called. May be nil if the balancer does not wish
301+
// to be notified when the RPC completes.
302+
Done func(DoneInfo)
303+
}
304+
305+
type transientFailureError struct {
306+
error
307+
}
308+
309+
func (e *transientFailureError) IsTransientFailure() bool { return true }
310+
311+
// TransientFailureError wraps err in an error implementing
312+
// IsTransientFailure() bool, returning true.
313+
func TransientFailureError(err error) error {
314+
return &transientFailureError{error: err}
315+
}
316+
317+
// V2Picker is used by gRPC to pick a SubConn to send an RPC.
318+
// Balancer is expected to generate a new picker from its snapshot every time its
319+
// internal state has changed.
320+
//
321+
// The pickers used by gRPC can be updated by ClientConn.UpdateBalancerState().
322+
type V2Picker interface {
323+
// Pick returns the connection to use for this RPC and related information.
324+
//
325+
// Pick should not block. If the balancer needs to do I/O or any blocking
326+
// or time-consuming work to service this call, it should return
327+
// ErrNoSubConnAvailable, and the Pick call will be repeated by gRPC when
328+
// the Picker is updated (using ClientConn.UpdateState).
329+
//
330+
// If an error is returned:
331+
//
332+
// - If the error is ErrNoSubConnAvailable, gRPC will block until a new
333+
// Picker is provided by the balancer (using ClientConn.UpdateState).
334+
//
335+
// - If the error implements IsTransientFailure() bool, returning true,
336+
// wait for ready RPCs will wait, but non-wait for ready RPCs will be
337+
// terminated with this error's Error() string and status code
338+
// Unavailable.
339+
//
340+
// - Any other errors terminate all RPCs with the code and message
341+
// provided. If the error is not a status error, it will be converted by
342+
// gRPC to a status error with code Unknown.
343+
Pick(info PickInfo) (PickResult, error)
258344
}
259345

260346
// Balancer takes input from gRPC, manages SubConns, and collects and aggregates
@@ -292,8 +378,11 @@ type Balancer interface {
292378

293379
// SubConnState describes the state of a SubConn.
294380
type SubConnState struct {
381+
// ConnectivityState is the connectivity state of the SubConn.
295382
ConnectivityState connectivity.State
296-
// TODO: add last connection error
383+
// ConnectionError is set if the ConnectivityState is TransientFailure,
384+
// describing the reason the SubConn failed. Otherwise, it is nil.
385+
ConnectionError error
297386
}
298387

299388
// ClientConnState describes the state of a ClientConn relevant to the
@@ -335,9 +424,8 @@ type V2Balancer interface {
335424
//
336425
// It's not thread safe.
337426
type ConnectivityStateEvaluator struct {
338-
numReady uint64 // Number of addrConns in ready state.
339-
numConnecting uint64 // Number of addrConns in connecting state.
340-
numTransientFailure uint64 // Number of addrConns in transientFailure.
427+
numReady uint64 // Number of addrConns in ready state.
428+
numConnecting uint64 // Number of addrConns in connecting state.
341429
}
342430

343431
// RecordTransition records state change happening in subConn and based on that
@@ -357,8 +445,6 @@ func (cse *ConnectivityStateEvaluator) RecordTransition(oldState, newState conne
357445
cse.numReady += updateVal
358446
case connectivity.Connecting:
359447
cse.numConnecting += updateVal
360-
case connectivity.TransientFailure:
361-
cse.numTransientFailure += updateVal
362448
}
363449
}
364450

balancer/base/balancer.go

Lines changed: 84 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ package base
2020

2121
import (
2222
"context"
23+
"errors"
2324

2425
"google.golang.org/grpc/balancer"
2526
"google.golang.org/grpc/connectivity"
@@ -28,25 +29,32 @@ import (
2829
)
2930

3031
type baseBuilder struct {
31-
name string
32-
pickerBuilder PickerBuilder
33-
config Config
32+
name string
33+
pickerBuilder PickerBuilder
34+
v2PickerBuilder V2PickerBuilder
35+
config Config
3436
}
3537

3638
func (bb *baseBuilder) Build(cc balancer.ClientConn, opt balancer.BuildOptions) balancer.Balancer {
37-
return &baseBalancer{
38-
cc: cc,
39-
pickerBuilder: bb.pickerBuilder,
39+
bal := &baseBalancer{
40+
cc: cc,
41+
pickerBuilder: bb.pickerBuilder,
42+
v2PickerBuilder: bb.v2PickerBuilder,
4043

4144
subConns: make(map[resolver.Address]balancer.SubConn),
4245
scStates: make(map[balancer.SubConn]connectivity.State),
4346
csEvltr: &balancer.ConnectivityStateEvaluator{},
44-
// Initialize picker to a picker that always return
45-
// ErrNoSubConnAvailable, because when state of a SubConn changes, we
46-
// may call UpdateBalancerState with this picker.
47-
picker: NewErrPicker(balancer.ErrNoSubConnAvailable),
48-
config: bb.config,
47+
config: bb.config,
4948
}
49+
// Initialize picker to a picker that always returns
50+
// ErrNoSubConnAvailable, because when state of a SubConn changes, we
51+
// may call UpdateState with this picker.
52+
if bb.pickerBuilder != nil {
53+
bal.picker = NewErrPicker(balancer.ErrNoSubConnAvailable)
54+
} else {
55+
bal.v2Picker = NewErrPickerV2(balancer.ErrNoSubConnAvailable)
56+
}
57+
return bal
5058
}
5159

5260
func (bb *baseBuilder) Name() string {
@@ -56,24 +64,33 @@ func (bb *baseBuilder) Name() string {
5664
var _ balancer.V2Balancer = (*baseBalancer)(nil) // Assert that we implement V2Balancer
5765

5866
type baseBalancer struct {
59-
cc balancer.ClientConn
60-
pickerBuilder PickerBuilder
67+
cc balancer.ClientConn
68+
pickerBuilder PickerBuilder
69+
v2PickerBuilder V2PickerBuilder
6170

6271
csEvltr *balancer.ConnectivityStateEvaluator
6372
state connectivity.State
6473

6574
subConns map[resolver.Address]balancer.SubConn
6675
scStates map[balancer.SubConn]connectivity.State
6776
picker balancer.Picker
77+
v2Picker balancer.V2Picker
6878
config Config
6979
}
7080

7181
func (b *baseBalancer) HandleResolvedAddrs(addrs []resolver.Address, err error) {
7282
panic("not implemented")
7383
}
7484

75-
func (b *baseBalancer) ResolverError(error) {
76-
// Ignore
85+
func (b *baseBalancer) ResolverError(err error) {
86+
switch b.state {
87+
case connectivity.TransientFailure, connectivity.Idle, connectivity.Connecting:
88+
if b.picker != nil {
89+
b.picker = NewErrPicker(err)
90+
} else {
91+
b.v2Picker = NewErrPickerV2(err)
92+
}
93+
}
7794
}
7895

7996
func (b *baseBalancer) UpdateClientConnState(s balancer.ClientConnState) error {
@@ -114,20 +131,44 @@ func (b *baseBalancer) UpdateClientConnState(s balancer.ClientConnState) error {
114131
// from it. The picker is
115132
// - errPicker with ErrTransientFailure if the balancer is in TransientFailure,
116133
// - built by the pickerBuilder with all READY SubConns otherwise.
117-
func (b *baseBalancer) regeneratePicker() {
134+
func (b *baseBalancer) regeneratePicker(err error) {
118135
if b.state == connectivity.TransientFailure {
119-
b.picker = NewErrPicker(balancer.ErrTransientFailure)
136+
if b.pickerBuilder != nil {
137+
b.picker = NewErrPicker(balancer.ErrTransientFailure)
138+
} else {
139+
if err != nil {
140+
b.v2Picker = NewErrPickerV2(balancer.TransientFailureError(err))
141+
} else {
142+
// This means the last subchannel transition was not to
143+
// TransientFailure (otherwise err must be set), but the
144+
// aggregate state of the balancer is TransientFailure, meaning
145+
// there are no other addresses.
146+
b.v2Picker = NewErrPickerV2(balancer.TransientFailureError(errors.New("resolver returned no addresses")))
147+
}
148+
}
120149
return
121150
}
122-
readySCs := make(map[resolver.Address]balancer.SubConn)
151+
if b.pickerBuilder != nil {
152+
readySCs := make(map[resolver.Address]balancer.SubConn)
123153

124-
// Filter out all ready SCs from full subConn map.
125-
for addr, sc := range b.subConns {
126-
if st, ok := b.scStates[sc]; ok && st == connectivity.Ready {
127-
readySCs[addr] = sc
154+
// Filter out all ready SCs from full subConn map.
155+
for addr, sc := range b.subConns {
156+
if st, ok := b.scStates[sc]; ok && st == connectivity.Ready {
157+
readySCs[addr] = sc
158+
}
159+
}
160+
b.picker = b.pickerBuilder.Build(readySCs)
161+
} else {
162+
readySCs := make(map[balancer.SubConn]SubConnInfo)
163+
164+
// Filter out all ready SCs from full subConn map.
165+
for addr, sc := range b.subConns {
166+
if st, ok := b.scStates[sc]; ok && st == connectivity.Ready {
167+
readySCs[sc] = SubConnInfo{Address: addr}
168+
}
128169
}
170+
b.v2Picker = b.v2PickerBuilder.Build(PickerBuildInfo{ReadySCs: readySCs})
129171
}
130-
b.picker = b.pickerBuilder.Build(readySCs)
131172
}
132173

133174
func (b *baseBalancer) HandleSubConnStateChange(sc balancer.SubConn, s connectivity.State) {
@@ -166,10 +207,14 @@ func (b *baseBalancer) UpdateSubConnState(sc balancer.SubConn, state balancer.Su
166207
// - the aggregated state of balancer became non-TransientFailure from TransientFailure
167208
if (s == connectivity.Ready) != (oldS == connectivity.Ready) ||
168209
(b.state == connectivity.TransientFailure) != (oldAggrState == connectivity.TransientFailure) {
169-
b.regeneratePicker()
210+
b.regeneratePicker(state.ConnectionError)
170211
}
171212

172-
b.cc.UpdateBalancerState(b.state, b.picker)
213+
if b.picker != nil {
214+
b.cc.UpdateBalancerState(b.state, b.picker)
215+
} else {
216+
b.cc.UpdateState(balancer.State{ConnectivityState: b.state, Picker: b.v2Picker})
217+
}
173218
}
174219

175220
// Close is a nop because base balancer doesn't have internal state to clean up,
@@ -186,6 +231,19 @@ type errPicker struct {
186231
err error // Pick() always returns this err.
187232
}
188233

189-
func (p *errPicker) Pick(ctx context.Context, opts balancer.PickOptions) (balancer.SubConn, func(balancer.DoneInfo), error) {
234+
func (p *errPicker) Pick(context.Context, balancer.PickInfo) (balancer.SubConn, func(balancer.DoneInfo), error) {
190235
return nil, nil, p.err
191236
}
237+
238+
// NewErrPickerV2 returns a V2Picker that always returns err on Pick().
239+
func NewErrPickerV2(err error) balancer.V2Picker {
240+
return &errPickerV2{err: err}
241+
}
242+
243+
type errPickerV2 struct {
244+
err error // Pick() always returns this err.
245+
}
246+
247+
func (p *errPickerV2) Pick(info balancer.PickInfo) (balancer.PickResult, error) {
248+
return balancer.PickResult{}, p.err
249+
}

0 commit comments

Comments
 (0)