Skip to content

Commit 7572f55

Browse files
author
Adhityaa Chandrasekar
committed
V2: server option, stream ID bitshift
1 parent c3ba2b5 commit 7572f55

File tree

1 file changed

+41
-31
lines changed

1 file changed

+41
-31
lines changed

server.go

Lines changed: 41 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,7 @@ type serverOptions struct {
131131
connectionTimeout time.Duration
132132
maxHeaderListSize *uint32
133133
headerTableSize *uint32
134+
numStreamWorkers uint32
134135
}
135136

136137
var defaultServerOptions = serverOptions{
@@ -388,6 +389,22 @@ func HeaderTableSize(s uint32) ServerOption {
388389
})
389390
}
390391

392+
// NumStreamWorkers returns a ServerOption that sets the number of worker
393+
// goroutines that should be used to process incoming streams. Setting this to
394+
// zero (default) will disable workers and spawn a new goroutine for each
395+
// stream.
396+
//
397+
// This API is EXPERIMENTAL.
398+
func NumStreamWorkers(numStreamWorkers uint32) ServerOption {
399+
// TODO: If/when this API gets stabilized (i.e. stream workers become the
400+
// only way streams are processed), change the behavior of the zero value to
401+
// a sane default. Preliminary experiments suggest that a value equal to the
402+
// number of CPUs available is most performant; requires thorough testing.
403+
return newFuncServerOption(func(o *serverOptions) {
404+
o.numStreamWorkers = numStreamWorkers
405+
})
406+
}
407+
391408
// NewServer creates a gRPC server which has no service registered and has not
392409
// started to accept requests yet.
393410
func NewServer(opt ...ServerOption) *Server {
@@ -712,17 +729,6 @@ func (s *Server) newHTTP2Transport(c net.Conn, authInfo credentials.AuthInfo) tr
712729
return st
713730
}
714731

715-
func floorCPUCount() uint32 {
716-
n := uint32(runtime.NumCPU())
717-
for i := uint32(1 << 31); i >= 2; i >>= 1 {
718-
if n&i > 0 {
719-
return i
720-
}
721-
}
722-
723-
return 1
724-
}
725-
726732
// workerStackReset defines how often the stack must be reset. Every N
727733
// requests, by spawning a new goroutine in its place, a worker can reset its
728734
// stack so that large stacks don't live in memory forever. 2^16 should allow
@@ -749,34 +755,36 @@ func (s *Server) streamWorker(st transport.ServerTransport, wg *sync.WaitGroup,
749755
}
750756
}
751757

752-
// numWorkers defines the number of stream handling workers. After experiments
753-
// with different CPU counts, using the floor of the number of CPUs available
754-
// was found to be the number optimal for performance across the board (QPS,
755-
// latency).
756-
var numWorkers = floorCPUCount()
757-
758-
// workerMask is used to perform bitwise AND operations instead of expensive
759-
// module operations on integers.
760-
var workerMask = numWorkers - 1
761-
762758
func (s *Server) serveStreams(st transport.ServerTransport) {
763759
defer st.Close()
764760
var wg sync.WaitGroup
765761

766-
streamChannels := make([]chan *transport.Stream, numWorkers)
767-
for i := range streamChannels {
768-
streamChannels[i] = make(chan *transport.Stream)
769-
go s.streamWorker(st, &wg, streamChannels[i])
762+
var streamChannels []chan *transport.Stream
763+
if s.opts.numStreamWorkers > 0 {
764+
streamChannels = make([]chan *transport.Stream, s.opts.numStreamWorkers)
765+
for i := range streamChannels {
766+
streamChannels[i] = make(chan *transport.Stream)
767+
go s.streamWorker(st, &wg, streamChannels[i])
768+
}
770769
}
771770

771+
var streamChannelCounter uint32
772772
st.HandleStreams(func(stream *transport.Stream) {
773773
wg.Add(1)
774-
select {
775-
case streamChannels[stream.ID()&workerMask] <- stream:
776-
default:
774+
if s.opts.numStreamWorkers > 0 {
775+
select {
776+
case streamChannels[atomic.AddUint32(&streamChannelCounter, 1)%s.opts.numStreamWorkers] <- stream:
777+
default:
778+
// If all stream workers are busy, fallback to default code path.
779+
go func() {
780+
s.handleStream(st, stream, s.traceInfo(st, stream))
781+
wg.Done()
782+
}()
783+
}
784+
} else {
777785
go func() {
786+
defer wg.Done()
778787
s.handleStream(st, stream, s.traceInfo(st, stream))
779-
wg.Done()
780788
}()
781789
}
782790
}, func(ctx context.Context, method string) context.Context {
@@ -788,8 +796,10 @@ func (s *Server) serveStreams(st transport.ServerTransport) {
788796
})
789797
wg.Wait()
790798

791-
for _, ch := range streamChannels {
792-
close(ch)
799+
if s.opts.numStreamWorkers > 0 {
800+
for _, ch := range streamChannels {
801+
close(ch)
802+
}
793803
}
794804
}
795805

0 commit comments

Comments
 (0)