Skip to content

Commit d5ceade

Browse files
authored
Merge pull request #753 from nats-io/route_perms_reload
[ADDED] Support for route permissions config reload
2 parents 97ab2d6 + 4cd3453 commit d5ceade

File tree

6 files changed

+865
-55
lines changed

6 files changed

+865
-55
lines changed

server/client.go

+25-17
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ type clientFlag byte
6969
// Some client state represented as flags
7070
const (
7171
connectReceived clientFlag = 1 << iota // The CONNECT proto has been received
72+
infoReceived // The INFO protocol has been received
7273
firstPongSent // The first PONG has been sent
7374
handshakeComplete // For TLS clients, indicate that the handshake is complete
7475
clearConnection // Marks that clearConnection has already been called.
@@ -856,9 +857,12 @@ func (c *client) maxPayloadViolation(sz int, max int64) {
856857
}
857858

858859
// queueOutbound queues data for client/route connections.
859-
// Return pending length.
860+
// Return if the data is referenced or not. If referenced, the caller
861+
// should not reuse the `data` array.
860862
// Lock should be held.
861-
func (c *client) queueOutbound(data []byte) {
863+
func (c *client) queueOutbound(data []byte) bool {
864+
// Assume data will not be referenced
865+
referenced := false
862866
// Add to pending bytes total.
863867
c.out.pb += int64(len(data))
864868

@@ -868,7 +872,7 @@ func (c *client) queueOutbound(data []byte) {
868872
c.clearConnection(SlowConsumerPendingBytes)
869873
atomic.AddInt64(&c.srv.slowConsumers, 1)
870874
c.Noticef("Slow Consumer Detected: MaxPending of %d Exceeded", c.out.mp)
871-
return
875+
return referenced
872876
}
873877

874878
if c.out.p == nil && len(data) < maxBufSize {
@@ -901,6 +905,7 @@ func (c *client) queueOutbound(data []byte) {
901905
// FIXME(dlc) - do we need signaling of ownership here if we want len(data) <
902906
if len(data) > maxBufSize {
903907
c.out.nb = append(c.out.nb, data)
908+
referenced = true
904909
} else {
905910
// We will copy to primary.
906911
if c.out.p == nil {
@@ -924,6 +929,7 @@ func (c *client) queueOutbound(data []byte) {
924929
} else {
925930
c.out.p = append(c.out.p, data...)
926931
}
932+
return referenced
927933
}
928934

929935
// Assume the lock is held upon entry.
@@ -993,6 +999,12 @@ func (c *client) processPing() {
993999
}
9941000
c.sendPong()
9951001

1002+
// If not a CLIENT, we are done
1003+
if c.typ != CLIENT {
1004+
c.mu.Unlock()
1005+
return
1006+
}
1007+
9961008
// The CONNECT should have been received, but make sure it
9971009
// is so before proceeding
9981010
if !c.flags.isSet(connectReceived) {
@@ -1667,19 +1679,18 @@ func (c *client) processPingTimer() {
16671679

16681680
c.Debugf("%s Ping Timer", c.typeString())
16691681

1670-
// Check for violation
1671-
if c.ping.out+1 > c.srv.getOpts().MaxPingsOut {
1672-
c.Debugf("Stale Client Connection - Closing")
1673-
c.sendProto([]byte(fmt.Sprintf("-ERR '%s'\r\n", "Stale Connection")), true)
1674-
c.clearConnection(StaleConnection)
1675-
return
1676-
}
1677-
16781682
// If we have had activity within the PingInterval no
16791683
// need to send a ping.
16801684
if delta := time.Since(c.last); delta < c.srv.getOpts().PingInterval {
16811685
c.Debugf("Delaying PING due to activity %v ago", delta.Round(time.Second))
16821686
} else {
1687+
// Check for violation
1688+
if c.ping.out+1 > c.srv.getOpts().MaxPingsOut {
1689+
c.Debugf("Stale Client Connection - Closing")
1690+
c.sendProto([]byte(fmt.Sprintf("-ERR '%s'\r\n", "Stale Connection")), true)
1691+
c.clearConnection(StaleConnection)
1692+
return
1693+
}
16831694
// Send PING
16841695
c.sendPing()
16851696
}
@@ -1824,12 +1835,9 @@ func (c *client) closeConnection(reason ClosedState) {
18241835

18251836
// Remove clients subscriptions.
18261837
srv.sl.RemoveBatch(subs)
1827-
if c.typ != ROUTER {
1828-
for _, sub := range subs {
1829-
// Forward on unsubscribes if we are not
1830-
// a router ourselves.
1831-
srv.broadcastUnSubscribe(sub)
1832-
}
1838+
if c.typ == CLIENT {
1839+
// Forward UNSUBs protocols to all routes
1840+
srv.broadcastUnSubscribeBatch(subs)
18331841
}
18341842
}
18351843

server/reload.go

+164-25
Original file line numberDiff line numberDiff line change
@@ -38,20 +38,37 @@ type option interface {
3838

3939
// IsAuthChange indicates if this option requires reloading authorization.
4040
IsAuthChange() bool
41+
42+
// IsClusterPermsChange indicates if this option requires reloading
43+
// cluster permissions.
44+
IsClusterPermsChange() bool
45+
}
46+
47+
// noopOption is a base struct that provides default no-op behaviors.
48+
type noopOption struct{}
49+
50+
func (n noopOption) IsLoggingChange() bool {
51+
return false
52+
}
53+
54+
func (n noopOption) IsAuthChange() bool {
55+
return false
56+
}
57+
58+
func (n noopOption) IsClusterPermsChange() bool {
59+
return false
4160
}
4261

4362
// loggingOption is a base struct that provides default option behaviors for
4463
// logging-related options.
45-
type loggingOption struct{}
64+
type loggingOption struct {
65+
noopOption
66+
}
4667

4768
func (l loggingOption) IsLoggingChange() bool {
4869
return true
4970
}
5071

51-
func (l loggingOption) IsAuthChange() bool {
52-
return false
53-
}
54-
5572
// traceOption implements the option interface for the `trace` setting.
5673
type traceOption struct {
5774
loggingOption
@@ -119,17 +136,6 @@ func (r *remoteSyslogOption) Apply(server *Server) {
119136
server.Noticef("Reloaded: remote_syslog = %v", r.newValue)
120137
}
121138

122-
// noopOption is a base struct that provides default no-op behaviors.
123-
type noopOption struct{}
124-
125-
func (n noopOption) IsLoggingChange() bool {
126-
return false
127-
}
128-
129-
func (n noopOption) IsAuthChange() bool {
130-
return false
131-
}
132-
133139
// tlsOption implements the option interface for the `tls` setting.
134140
type tlsOption struct {
135141
noopOption
@@ -164,10 +170,8 @@ func (t *tlsTimeoutOption) Apply(server *Server) {
164170
}
165171

166172
// authOption is a base struct that provides default option behaviors.
167-
type authOption struct{}
168-
169-
func (o authOption) IsLoggingChange() bool {
170-
return false
173+
type authOption struct {
174+
noopOption
171175
}
172176

173177
func (o authOption) IsAuthChange() bool {
@@ -235,7 +239,8 @@ func (u *usersOption) Apply(server *Server) {
235239
// clusterOption implements the option interface for the `cluster` setting.
236240
type clusterOption struct {
237241
authOption
238-
newValue ClusterOpts
242+
newValue ClusterOpts
243+
permsChanged bool
239244
}
240245

241246
// Apply the cluster change.
@@ -256,6 +261,10 @@ func (c *clusterOption) Apply(server *Server) {
256261
server.Noticef("Reloaded: cluster")
257262
}
258263

264+
func (c *clusterOption) IsClusterPermsChange() bool {
265+
return c.permsChanged
266+
}
267+
259268
// routesOption implements the option interface for the cluster `routes`
260269
// setting.
261270
type routesOption struct {
@@ -503,6 +512,10 @@ func (s *Server) reloadOptions(newOpts *Options) error {
503512
if err != nil {
504513
return err
505514
}
515+
// Need to save off previous cluster permissions
516+
s.mu.Lock()
517+
s.oldClusterPerms = s.opts.Cluster.Permissions
518+
s.mu.Unlock()
506519
s.setOpts(newOpts)
507520
s.applyOptions(changed)
508521
return nil
@@ -557,10 +570,12 @@ func (s *Server) diffOptions(newOpts *Options) ([]option, error) {
557570
diffOpts = append(diffOpts, &usersOption{newValue: newValue.([]*User)})
558571
case "cluster":
559572
newClusterOpts := newValue.(ClusterOpts)
560-
if err := validateClusterOpts(oldValue.(ClusterOpts), newClusterOpts); err != nil {
573+
oldClusterOpts := oldValue.(ClusterOpts)
574+
if err := validateClusterOpts(oldClusterOpts, newClusterOpts); err != nil {
561575
return nil, err
562576
}
563-
diffOpts = append(diffOpts, &clusterOption{newValue: newClusterOpts})
577+
permsChanged := !reflect.DeepEqual(newClusterOpts.Permissions, oldClusterOpts.Permissions)
578+
diffOpts = append(diffOpts, &clusterOption{newValue: newClusterOpts, permsChanged: permsChanged})
564579
case "routes":
565580
add, remove := diffRoutes(oldValue.([]*url.URL), newValue.([]*url.URL))
566581
diffOpts = append(diffOpts, &routesOption{add: add, remove: remove})
@@ -612,8 +627,9 @@ func (s *Server) diffOptions(newOpts *Options) ([]option, error) {
612627

613628
func (s *Server) applyOptions(opts []option) {
614629
var (
615-
reloadLogging = false
616-
reloadAuth = false
630+
reloadLogging = false
631+
reloadAuth = false
632+
reloadClusterPerms = false
617633
)
618634
for _, opt := range opts {
619635
opt.Apply(s)
@@ -623,6 +639,9 @@ func (s *Server) applyOptions(opts []option) {
623639
if opt.IsAuthChange() {
624640
reloadAuth = true
625641
}
642+
if opt.IsClusterPermsChange() {
643+
reloadClusterPerms = true
644+
}
626645
}
627646

628647
if reloadLogging {
@@ -631,6 +650,9 @@ func (s *Server) applyOptions(opts []option) {
631650
if reloadAuth {
632651
s.reloadAuthorization()
633652
}
653+
if reloadClusterPerms {
654+
s.reloadClusterPermissions()
655+
}
634656

635657
s.Noticef("Reloaded server configuration")
636658
}
@@ -674,6 +696,123 @@ func (s *Server) reloadAuthorization() {
674696
}
675697
}
676698

699+
// reloadClusterPermissions reconfigures the cluster's permssions
700+
// and set the permissions to all existing routes, sending an
701+
// update INFO protocol so that remote can resend their local
702+
// subs if needed, and sending local subs matching cluster's
703+
// import subjects.
704+
func (s *Server) reloadClusterPermissions() {
705+
s.mu.Lock()
706+
var (
707+
infoJSON []byte
708+
oldPerms = s.oldClusterPerms
709+
newPerms = s.opts.Cluster.Permissions
710+
routes = make(map[uint64]*client, len(s.routes))
711+
withNewProto int
712+
)
713+
// We can clear this now that we have captured it with oldPerms.
714+
s.oldClusterPerms = nil
715+
// Get all connected routes
716+
for i, route := range s.routes {
717+
// Count the number of routes that can understand receiving INFO updates.
718+
route.mu.Lock()
719+
if route.opts.Protocol >= routeProtoInfo {
720+
withNewProto++
721+
}
722+
route.mu.Unlock()
723+
routes[i] = route
724+
}
725+
// If new permissions is nil, then clear routeInfo import/export
726+
if newPerms == nil {
727+
s.routeInfo.Import = nil
728+
s.routeInfo.Export = nil
729+
} else {
730+
s.routeInfo.Import = newPerms.Import
731+
s.routeInfo.Export = newPerms.Export
732+
}
733+
// Regenerate route INFO
734+
s.generateRouteInfoJSON()
735+
infoJSON = s.routeInfoJSON
736+
s.mu.Unlock()
737+
738+
// If there were no route, we are done
739+
if len(routes) == 0 {
740+
return
741+
}
742+
743+
// If only older servers, simply close all routes and they will do the right
744+
// thing on reconnect.
745+
if withNewProto == 0 {
746+
for _, route := range routes {
747+
route.closeConnection(RouteRemoved)
748+
}
749+
return
750+
}
751+
752+
// Fake clients to test cluster permissions
753+
oldPermsTester := &client{}
754+
oldPermsTester.setRoutePermissions(oldPerms)
755+
newPermsTester := &client{}
756+
newPermsTester.setRoutePermissions(newPerms)
757+
758+
var (
759+
_localSubs [4096]*subscription
760+
localSubs = _localSubs[:0]
761+
subsNeedSUB []*subscription
762+
subsNeedUNSUB []*subscription
763+
deleteRoutedSubs []*subscription
764+
)
765+
s.sl.localSubs(&localSubs)
766+
767+
// Go through all local subscriptions
768+
for _, sub := range localSubs {
769+
// Get all subs that can now be imported
770+
couldImportThen := oldPermsTester.canImport(sub.subject)
771+
canImportNow := newPermsTester.canImport(sub.subject)
772+
if canImportNow {
773+
// If we could not before, then will need to send a SUB protocol.
774+
if !couldImportThen {
775+
subsNeedSUB = append(subsNeedSUB, sub)
776+
}
777+
} else if couldImportThen {
778+
// We were previously able to import this sub, but now
779+
// we can't so we need to send an UNSUB protocol
780+
subsNeedUNSUB = append(subsNeedUNSUB, sub)
781+
}
782+
}
783+
784+
for _, route := range routes {
785+
route.mu.Lock()
786+
// If route is to older server, simply close connection.
787+
if route.opts.Protocol < routeProtoInfo {
788+
route.mu.Unlock()
789+
route.closeConnection(RouteRemoved)
790+
continue
791+
}
792+
route.setRoutePermissions(newPerms)
793+
for _, sub := range route.subs {
794+
// If we can't export, we need to drop the subscriptions that
795+
// we have on behalf of this route.
796+
if !route.canExport(sub.subject) {
797+
delete(route.subs, string(sub.sid))
798+
deleteRoutedSubs = append(deleteRoutedSubs, sub)
799+
}
800+
}
801+
// Send an update INFO, which will allow remote server to show
802+
// our current route config in monitoring and resend subscriptions
803+
// that we now possibly allow with a change of Export permissions.
804+
route.sendInfo(infoJSON)
805+
// Now send SUB and UNSUB protocols as needed.
806+
closed := route.sendRouteSubProtos(subsNeedSUB, nil)
807+
if !closed {
808+
route.sendRouteUnSubProtos(subsNeedUNSUB, nil)
809+
}
810+
route.mu.Unlock()
811+
}
812+
// Remove as a batch all the subs that we have removed from each route.
813+
s.sl.RemoveBatch(deleteRoutedSubs)
814+
}
815+
677816
// validateClusterOpts ensures the new ClusterOpts does not change host or
678817
// port, which do not support reload.
679818
func validateClusterOpts(old, new ClusterOpts) error {

0 commit comments

Comments
 (0)