Skip to content

[ADDED] Support for route permissions config reload #753

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Sep 27, 2018
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions server/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ type clientFlag byte
// Some client state represented as flags
const (
connectReceived clientFlag = 1 << iota // The CONNECT proto has been received
infoReceived // The INFO protocol has been received
firstPongSent // The first PONG has been sent
handshakeComplete // For TLS clients, indicate that the handshake is complete
clearConnection // Marks that clearConnection has already been called.
Expand Down
200 changes: 175 additions & 25 deletions server/reload.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,20 +38,37 @@ type option interface {

// IsAuthChange indicates if this option requires reloading authorization.
IsAuthChange() bool

// IsClusterPermsChange indicates if this option requires reloading
// cluster permissions.
IsClusterPermsChange() bool
}

// noopOption is a base struct that provides default no-op behaviors.
type noopOption struct{}

func (n noopOption) IsLoggingChange() bool {
return false
}

func (n noopOption) IsAuthChange() bool {
return false
}

func (n noopOption) IsClusterPermsChange() bool {
return false
}

// loggingOption is a base struct that provides default option behaviors for
// logging-related options.
type loggingOption struct{}
type loggingOption struct {
noopOption
}

func (l loggingOption) IsLoggingChange() bool {
return true
}

func (l loggingOption) IsAuthChange() bool {
return false
}

// traceOption implements the option interface for the `trace` setting.
type traceOption struct {
loggingOption
Expand Down Expand Up @@ -119,17 +136,6 @@ func (r *remoteSyslogOption) Apply(server *Server) {
server.Noticef("Reloaded: remote_syslog = %v", r.newValue)
}

// noopOption is a base struct that provides default no-op behaviors.
type noopOption struct{}

func (n noopOption) IsLoggingChange() bool {
return false
}

func (n noopOption) IsAuthChange() bool {
return false
}

// tlsOption implements the option interface for the `tls` setting.
type tlsOption struct {
noopOption
Expand Down Expand Up @@ -164,10 +170,8 @@ func (t *tlsTimeoutOption) Apply(server *Server) {
}

// authOption is a base struct that provides default option behaviors.
type authOption struct{}

func (o authOption) IsLoggingChange() bool {
return false
type authOption struct {
noopOption
}

func (o authOption) IsAuthChange() bool {
Expand Down Expand Up @@ -235,7 +239,8 @@ func (u *usersOption) Apply(server *Server) {
// clusterOption implements the option interface for the `cluster` setting.
type clusterOption struct {
authOption
newValue ClusterOpts
newValue ClusterOpts
permsChanged bool
}

// Apply the cluster change.
Expand All @@ -256,6 +261,10 @@ func (c *clusterOption) Apply(server *Server) {
server.Noticef("Reloaded: cluster")
}

func (c *clusterOption) IsClusterPermsChange() bool {
return c.permsChanged
}

// routesOption implements the option interface for the cluster `routes`
// setting.
type routesOption struct {
Expand Down Expand Up @@ -503,6 +512,10 @@ func (s *Server) reloadOptions(newOpts *Options) error {
if err != nil {
return err
}
// Need to save off previous cluster permissions
s.mu.Lock()
s.oldClusterPerms = s.opts.Cluster.Permissions
s.mu.Unlock()
s.setOpts(newOpts)
s.applyOptions(changed)
return nil
Expand Down Expand Up @@ -557,10 +570,12 @@ func (s *Server) diffOptions(newOpts *Options) ([]option, error) {
diffOpts = append(diffOpts, &usersOption{newValue: newValue.([]*User)})
case "cluster":
newClusterOpts := newValue.(ClusterOpts)
if err := validateClusterOpts(oldValue.(ClusterOpts), newClusterOpts); err != nil {
oldClusterOpts := oldValue.(ClusterOpts)
if err := validateClusterOpts(oldClusterOpts, newClusterOpts); err != nil {
return nil, err
}
diffOpts = append(diffOpts, &clusterOption{newValue: newClusterOpts})
permsChanged := !reflect.DeepEqual(newClusterOpts.Permissions, oldClusterOpts.Permissions)
diffOpts = append(diffOpts, &clusterOption{newValue: newClusterOpts, permsChanged: permsChanged})
case "routes":
add, remove := diffRoutes(oldValue.([]*url.URL), newValue.([]*url.URL))
diffOpts = append(diffOpts, &routesOption{add: add, remove: remove})
Expand Down Expand Up @@ -612,8 +627,9 @@ func (s *Server) diffOptions(newOpts *Options) ([]option, error) {

func (s *Server) applyOptions(opts []option) {
var (
reloadLogging = false
reloadAuth = false
reloadLogging = false
reloadAuth = false
reloadClusterPerms = false
)
for _, opt := range opts {
opt.Apply(s)
Expand All @@ -623,6 +639,9 @@ func (s *Server) applyOptions(opts []option) {
if opt.IsAuthChange() {
reloadAuth = true
}
if opt.IsClusterPermsChange() {
reloadClusterPerms = true
}
}

if reloadLogging {
Expand All @@ -631,6 +650,9 @@ func (s *Server) applyOptions(opts []option) {
if reloadAuth {
s.reloadAuthorization()
}
if reloadClusterPerms {
s.reloadClusterPermissions()
}

s.Noticef("Reloaded server configuration")
}
Expand Down Expand Up @@ -674,6 +696,134 @@ func (s *Server) reloadAuthorization() {
}
}

// reloadClusterPermissions reconfigures the cluster's permssions
// and set the permissions to all existing routes, sending an
// update INFO protocol so that remote can resend their local
// subs if needed, and sending local subs matching cluster's
// import subjects.
func (s *Server) reloadClusterPermissions() {
s.mu.Lock()
var (
infoJSON []byte
oldPerms = s.oldClusterPerms
newPerms = s.opts.Cluster.Permissions
routes = make(map[uint64]*client, len(s.routes))
withNewProto int
)
// We can clear this now that we have captured it with oldPerms.
s.oldClusterPerms = nil
// Get all connected routes
for i, route := range s.routes {
// Count the number of routes that can understand receiving INFO updates.
route.mu.Lock()
if route.opts.Protocol >= routeProtoInfo {
withNewProto++
}
route.mu.Unlock()
routes[i] = route
}
// If new permissions is nil, then clear routeInfo import/export
if newPerms == nil {
s.routeInfo.Import = nil
s.routeInfo.Export = nil
} else {
s.routeInfo.Import = newPerms.Import
s.routeInfo.Export = newPerms.Export
}
// Regenerate route INFO
s.generateRouteInfoJSON()
infoJSON = s.routeInfoJSON
s.mu.Unlock()

// If there were no route, we are done
if len(routes) == 0 {
return
}

// If only older servers, simply close all routes and they will do the right
// thing on reconnect.
if withNewProto == 0 {
for _, route := range routes {
route.closeConnection(RouteRemoved)
}
return
}

// Fake clients to test cluster permissions
oldPermsTester := &client{}
oldPermsTester.setRoutePermissions(oldPerms)
newPermsTester := &client{}
newPermsTester.setRoutePermissions(newPerms)

var (
_localSubs [4096]*subscription
localSubs = _localSubs[:0]
subsNeedSUB []*subscription
subsNeedUNSUB []*subscription
deleteRoutedSubs []*subscription
)
s.sl.localSubs(&localSubs)

// Go through all local subscriptions
for _, sub := range localSubs {
sub.client.mu.Lock()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why the lock here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I used to have a flag in the sub to know if we already sent a SUB to routes, to know if we should or not send it again. But that wasn't too good, but explains why I had the lock here. Will remove (although we access sub.subject, but this is immutable).

// Get all subs that can now be imported
couldImportThen := oldPermsTester.canImport(sub.subject)
canImportNow := newPermsTester.canImport(sub.subject)
if canImportNow {
// If we could not before, then will need to send a SUB protocol.
if !couldImportThen {
subsNeedSUB = append(subsNeedSUB, sub)
}
} else if couldImportThen {
// We were previously able to import this sub, but now
// we can't so we need to send an UNSUB protocol
subsNeedUNSUB = append(subsNeedUNSUB, sub)
}
sub.client.mu.Unlock()
}

for _, route := range routes {
route.mu.Lock()
// If route is to older server, simply close connection.
if route.opts.Protocol < routeProtoInfo {
route.mu.Unlock()
route.closeConnection(RouteRemoved)
continue
}
route.setRoutePermissions(newPerms)
for _, sub := range route.subs {
// If we can't export, we need to drop the subscriptions that
// we have on behalf of this route.
if !route.canExport(sub.subject) {
delete(route.subs, string(sub.sid))
deleteRoutedSubs = append(deleteRoutedSubs, sub)
}
}
// Send an update INFO, which will allow remote server to show
// our current route config in monitoring and resend subscriptions
// that we now possibly allow with a change of Export permissions.
route.sendInfo(infoJSON)
// Now send SUB and UNSUB protocols as needed.
for _, sub := range subsNeedSUB {
route.queueOutbound([]byte(fmt.Sprintf(subProto, sub.subject, sub.queue, routeSid(sub))))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These fmt can be expensive if we have alot of sub/unsubs. Might be good to do some high sub benchmarking to see if we want to change out how these loops work.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What would be the alternative, though?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Format by hand, avoid fmt.Sprintf, use a large staged buffer or something and only send occasionally to the socket etc.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought queueOutbound would do that (not sending right away), hence presence of signal. But passed buffer may not be copied, so we can't "reuse" a stack buffer to build the protocol, right?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will reference (with sketchy ownership) if data passed in is larger than maxBufSize. Otherwise it copies it. We should just measure it and if there is a concern put a TODO comment in the code.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will do, but understand that this is no different than route sending local subs interest on route connect.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to optimize that one too IMO, and I will most likely during rewrite.. :)

if route.out.pb > int64(route.out.sz*2) {
route.flushSignal()
}
}
for _, sub := range subsNeedUNSUB {
route.queueOutbound([]byte(fmt.Sprintf(unsubProto, routeSid(sub))))
if route.out.pb > int64(route.out.sz*2) {
route.flushSignal()
}
}
route.flushSignal()
route.mu.Unlock()
}
// Remove as a batch all the subs that we have removed from each route.
s.sl.RemoveBatch(deleteRoutedSubs)
}

// validateClusterOpts ensures the new ClusterOpts does not change host or
// port, which do not support reload.
func validateClusterOpts(old, new ClusterOpts) error {
Expand Down
Loading