Skip to content

Commit a0cdc21

Browse files
authored
server.go: use worker goroutines for fewer stack allocations (#3204)
Currently (go1.13.4), the default stack size for newly spawned goroutines is 2048 bytes. This is insufficient when processing gRPC requests as the we often require more than 4 KiB stacks. This causes the Go runtime to call runtime.morestack at least twice per RPC, which causes performance to suffer needlessly as stack reallocations require all sorts of internal work such as changing pointers to point to new addresses. Since this stack growth is guaranteed to happen at least twice per RPC, reusing goroutines gives us two wins: 1. The stack is already grown to 8 KiB after the first RPC, so subsequent RPCs do not call runtime.morestack. 2. We eliminate the need to spawn a new goroutine for each request (even though they're relatively inexpensive). Performance improves across the board. The improvement is especially visible in small, unary requests as the overhead of stack reallocation is higher, percentage-wise. QPS is up anywhere between 3% and 5% depending on the number of concurrent RPC requests in flight. Latency is down ~3%. There is even a 1% decrease in memory footprint in some cases, though that is an unintended, but happy coincidence. unary-networkMode_none-bufConn_false-keepalive_false-benchTime_1m0s-trace_false-latency_0s-kbps_0-MTU_0-maxConcurrentCalls_8-reqSize_1B-respSize_1B-compressor_off-channelz_false-preloader_false Title Before After Percentage TotalOps 2613512 2701705 3.37% SendOps 0 0 NaN% RecvOps 0 0 NaN% Bytes/op 8657.00 8654.17 -0.03% Allocs/op 173.37 173.28 0.00% ReqT/op 348468.27 360227.33 3.37% RespT/op 348468.27 360227.33 3.37% 50th-Lat 174.601µs 167.378µs -4.14% 90th-Lat 233.132µs 229.087µs -1.74% 99th-Lat 438.98µs 441.857µs 0.66% Avg-Lat 183.263µs 177.26µs -3.28%
1 parent 29f40a4 commit a0cdc21

File tree

1 file changed

+96
-4
lines changed

1 file changed

+96
-4
lines changed

server.go

Lines changed: 96 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ import (
4242
"google.golang.org/grpc/grpclog"
4343
"google.golang.org/grpc/internal/binarylog"
4444
"google.golang.org/grpc/internal/channelz"
45+
"google.golang.org/grpc/internal/grpcrand"
4546
"google.golang.org/grpc/internal/grpcsync"
4647
"google.golang.org/grpc/internal/transport"
4748
"google.golang.org/grpc/keepalive"
@@ -87,6 +88,12 @@ type service struct {
8788
mdata interface{}
8889
}
8990

91+
type serverWorkerData struct {
92+
st transport.ServerTransport
93+
wg *sync.WaitGroup
94+
stream *transport.Stream
95+
}
96+
9097
// Server is a gRPC server to serve RPC requests.
9198
type Server struct {
9299
opts serverOptions
@@ -107,6 +114,8 @@ type Server struct {
107114

108115
channelzID int64 // channelz unique identification number
109116
czData *channelzData
117+
118+
serverWorkerChannels []chan *serverWorkerData
110119
}
111120

112121
type serverOptions struct {
@@ -133,6 +142,7 @@ type serverOptions struct {
133142
connectionTimeout time.Duration
134143
maxHeaderListSize *uint32
135144
headerTableSize *uint32
145+
numServerWorkers uint32
136146
}
137147

138148
var defaultServerOptions = serverOptions{
@@ -410,6 +420,66 @@ func HeaderTableSize(s uint32) ServerOption {
410420
})
411421
}
412422

423+
// NumStreamWorkers returns a ServerOption that sets the number of worker
424+
// goroutines that should be used to process incoming streams. Setting this to
425+
// zero (default) will disable workers and spawn a new goroutine for each
426+
// stream.
427+
//
428+
// This API is EXPERIMENTAL.
429+
func NumStreamWorkers(numServerWorkers uint32) ServerOption {
430+
// TODO: If/when this API gets stabilized (i.e. stream workers become the
431+
// only way streams are processed), change the behavior of the zero value to
432+
// a sane default. Preliminary experiments suggest that a value equal to the
433+
// number of CPUs available is most performant; requires thorough testing.
434+
return newFuncServerOption(func(o *serverOptions) {
435+
o.numServerWorkers = numServerWorkers
436+
})
437+
}
438+
439+
// serverWorkerResetThreshold defines how often the stack must be reset. Every
440+
// N requests, by spawning a new goroutine in its place, a worker can reset its
441+
// stack so that large stacks don't live in memory forever. 2^16 should allow
442+
// each goroutine stack to live for at least a few seconds in a typical
443+
// workload (assuming a QPS of a few thousand requests/sec).
444+
const serverWorkerResetThreshold = 1 << 16
445+
446+
// serverWorkers blocks on a *transport.Stream channel forever and waits for
447+
// data to be fed by serveStreams. This allows different requests to be
448+
// processed by the same goroutine, removing the need for expensive stack
449+
// re-allocations (see the runtime.morestack problem [1]).
450+
//
451+
// [1] https://github.com/golang/go/issues/18138
452+
func (s *Server) serverWorker(ch chan *serverWorkerData) {
453+
// To make sure all server workers don't reset at the same time, choose a
454+
// random number of iterations before resetting.
455+
threshold := serverWorkerResetThreshold + grpcrand.Intn(serverWorkerResetThreshold)
456+
for completed := 0; completed < threshold; completed++ {
457+
data, ok := <-ch
458+
if !ok {
459+
return
460+
}
461+
s.handleStream(data.st, data.stream, s.traceInfo(data.st, data.stream))
462+
data.wg.Done()
463+
}
464+
go s.serverWorker(ch)
465+
}
466+
467+
// initServerWorkers creates worker goroutines and channels to process incoming
468+
// connections to reduce the time spent overall on runtime.morestack.
469+
func (s *Server) initServerWorkers() {
470+
s.serverWorkerChannels = make([]chan *serverWorkerData, s.opts.numServerWorkers)
471+
for i := uint32(0); i < s.opts.numServerWorkers; i++ {
472+
s.serverWorkerChannels[i] = make(chan *serverWorkerData)
473+
go s.serverWorker(s.serverWorkerChannels[i])
474+
}
475+
}
476+
477+
func (s *Server) stopServerWorkers() {
478+
for i := uint32(0); i < s.opts.numServerWorkers; i++ {
479+
close(s.serverWorkerChannels[i])
480+
}
481+
}
482+
413483
// NewServer creates a gRPC server which has no service registered and has not
414484
// started to accept requests yet.
415485
func NewServer(opt ...ServerOption) *Server {
@@ -434,6 +504,10 @@ func NewServer(opt ...ServerOption) *Server {
434504
s.events = trace.NewEventLog("grpc.Server", fmt.Sprintf("%s:%d", file, line))
435505
}
436506

507+
if s.opts.numServerWorkers > 0 {
508+
s.initServerWorkers()
509+
}
510+
437511
if channelz.IsOn() {
438512
s.channelzID = channelz.RegisterServer(&channelzServer{s}, "")
439513
}
@@ -739,12 +813,27 @@ func (s *Server) newHTTP2Transport(c net.Conn, authInfo credentials.AuthInfo) tr
739813
func (s *Server) serveStreams(st transport.ServerTransport) {
740814
defer st.Close()
741815
var wg sync.WaitGroup
816+
817+
var roundRobinCounter uint32
742818
st.HandleStreams(func(stream *transport.Stream) {
743819
wg.Add(1)
744-
go func() {
745-
defer wg.Done()
746-
s.handleStream(st, stream, s.traceInfo(st, stream))
747-
}()
820+
if s.opts.numServerWorkers > 0 {
821+
data := &serverWorkerData{st: st, wg: &wg, stream: stream}
822+
select {
823+
case s.serverWorkerChannels[atomic.AddUint32(&roundRobinCounter, 1)%s.opts.numServerWorkers] <- data:
824+
default:
825+
// If all stream workers are busy, fallback to the default code path.
826+
go func() {
827+
s.handleStream(st, stream, s.traceInfo(st, stream))
828+
wg.Done()
829+
}()
830+
}
831+
} else {
832+
go func() {
833+
defer wg.Done()
834+
s.handleStream(st, stream, s.traceInfo(st, stream))
835+
}()
836+
}
748837
}, func(ctx context.Context, method string) context.Context {
749838
if !EnableTracing {
750839
return ctx
@@ -1507,6 +1596,9 @@ func (s *Server) Stop() {
15071596
for c := range st {
15081597
c.Close()
15091598
}
1599+
if s.opts.numServerWorkers > 0 {
1600+
s.stopServerWorkers()
1601+
}
15101602

15111603
s.mu.Lock()
15121604
if s.events != nil {

0 commit comments

Comments
 (0)