Skip to content

[ADDED] Support for route permissions config reload #753

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Sep 27, 2018
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion server/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -872,7 +872,7 @@ func (c *client) queueOutbound(data []byte) bool {
c.clearConnection(SlowConsumerPendingBytes)
atomic.AddInt64(&c.srv.slowConsumers, 1)
c.Noticef("Slow Consumer Detected: MaxPending of %d Exceeded", c.out.mp)
return false
return referenced
}

if c.out.p == nil && len(data) < maxBufSize {
Expand Down
39 changes: 24 additions & 15 deletions server/route.go
Original file line number Diff line number Diff line change
Expand Up @@ -612,11 +612,18 @@ func (c *client) sendRouteUnSubProtos(subs []*subscription, filter func(sub *sub
// Use sendRouteSubProtos or sendRouteUnSubProtos instead for clarity.
// Lock is held on entry.
func (c *client) sendRouteSubOrUnSubProtos(subs []*subscription, isSubProto bool, filter func(sub *subscription) bool) bool {
const staticBufSize = maxBufSize * 2
var (
_buf [maxBufSize]byte
buf = _buf[:0]
_buf [staticBufSize]byte // array on stack
buf = _buf[:0] // our buffer will initially point to the stack buffer
mbs = staticBufSize // max size of the buffer
mpMax = int(c.out.mp * 90 / 100) // 90% of max_pending
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would do 1/2 here.

closed bool
)
// We need to make sure that we stay below the user defined max pending bytes.
if mbs > mpMax {
mbs = mpMax
}
for _, sub := range subs {
if filter != nil && !filter(sub) {
continue
Expand All @@ -633,22 +640,21 @@ func (c *client) sendRouteSubOrUnSubProtos(subs []*subscription, isSubProto bool
}
// rsid + "\r\n"
curSize += len(rsid) + 2
if curSize >= maxBufSize {
if curSize >= mbs {
if c.queueOutbound(buf) {
buf = make([]byte, 0, maxBufSize)
// Need to allocate new array
buf = make([]byte, 0, mbs)
} else {
// We can reuse previous buffer
buf = buf[:0]
}
if c.out.pb >= int64(c.out.sz*2) {
// Update last activity because flushOutbound() will release
// the lock, which could cause pingTimer to think that this
// connection is stale otherwise.
c.last = time.Now()
c.flushOutbound()
closed = c.flags.isSet(clearConnection)
if closed {
break
}
// Update last activity because flushOutbound() will release
// the lock, which could cause pingTimer to think that this
// connection is stale otherwise.
c.last = time.Now()
c.flushOutbound()
if closed = c.flags.isSet(clearConnection); closed {
break
}
}
if isSubProto {
Expand Down Expand Up @@ -1028,8 +1034,11 @@ func (s *Server) broadcastUnSubscribe(sub *subscription) {
// to all connected routes. Used when a client connection is closed. Note
// that when that happens, the subscriptions' MAX have been cleared (force unsub).
func (s *Server) broadcastUnSubscribeBatch(subs []*subscription) {
var (
_routes [32]*client
routes = _routes[:0]
)
s.mu.Lock()
var routes []*client
for _, route := range s.routes {
routes = append(routes, route)
}
Expand Down
29 changes: 29 additions & 0 deletions server/routes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1089,3 +1089,32 @@ func TestRoutePermsAppliedOnInboundAndOutboundRoute(t *testing.T) {
// Now check for permissions set on server initiating the route connection
check(t, srvb)
}

func TestRouteSendLocalSubsWithLowMaxPending(t *testing.T) {
optsA := DefaultOptions()
optsA.MaxPending = 1024
srvA := RunServer(optsA)
defer srvA.Shutdown()

nc, err := nats.Connect(fmt.Sprintf("nats://%s:%d", optsA.Host, optsA.Port))
if err != nil {
t.Fatalf("Error on connect: %v", err)
}
defer nc.Close()
numSubs := 1000
for i := 0; i < numSubs; i++ {
nc.Subscribe("foo.bar", func(_ *nats.Msg) {})
}
checkExpectedSubs(t, numSubs, srvA)

// Now create a route between B and A
optsB := DefaultOptions()
optsB.Routes = RoutesFromStr(fmt.Sprintf("nats://%s:%d", optsA.Cluster.Host, optsA.Cluster.Port))
srvB := RunServer(optsB)
defer srvB.Shutdown()

checkClusterFormed(t, srvA, srvB)

// Check that all subs have been sent ok
checkExpectedSubs(t, numSubs, srvA, srvB)
}