Skip to content

[FIXED] Locking issue around account lookup/updates #1131

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Sep 18, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 18 additions & 28 deletions server/accounts.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,17 +291,13 @@ func (a *Account) numLocalLeafNodes() int {

// MaxTotalConnectionsReached returns if we have reached our limit for number of connections.
func (a *Account) MaxTotalConnectionsReached() bool {
var mtc bool
a.mu.RLock()
mtc := a.maxTotalConnectionsReached()
a.mu.RUnlock()
return mtc
}

func (a *Account) maxTotalConnectionsReached() bool {
if a.mconns != jwt.NoLimit {
return len(a.clients)-int(a.sysclients)+int(a.nrclients) >= int(a.mconns)
mtc = len(a.clients)-int(a.sysclients)+int(a.nrclients) >= int(a.mconns)
}
return false
a.mu.RUnlock()
return mtc
}

// MaxActiveConnections return the set limit for the account system
Expand Down Expand Up @@ -1456,8 +1452,9 @@ func (s *Server) SetAccountResolver(ar AccountResolver) {
// AccountResolver returns the registered account resolver.
func (s *Server) AccountResolver() AccountResolver {
s.mu.Lock()
defer s.mu.Unlock()
return s.accResolver
ar := s.accResolver
s.mu.Unlock()
return ar
}

// UpdateAccountClaims will call updateAccountClaims.
Expand All @@ -1467,6 +1464,7 @@ func (s *Server) UpdateAccountClaims(a *Account, ac *jwt.AccountClaims) {

// updateAccountClaims will update an existing account with new claims.
// This will replace any exports or imports previously defined.
// Lock MUST NOT be held upon entry.
func (s *Server) updateAccountClaims(a *Account, ac *jwt.AccountClaims) {
if a == nil {
return
Expand Down Expand Up @@ -1547,22 +1545,10 @@ func (s *Server) updateAccountClaims(a *Account, ac *jwt.AccountClaims) {
}
}
for _, i := range ac.Imports {
var acc *Account
if v, ok := s.accounts.Load(i.Account); ok {
acc = v.(*Account)
}
if acc == nil {
// Check to see if the account referenced is not one that
// we are currently building (but not yet fully registered).
if v, ok := s.tmpAccounts.Load(i.Account); ok {
acc = v.(*Account)
}
}
if acc == nil {
if acc, _ = s.fetchAccount(i.Account); acc == nil {
s.Debugf("Can't locate account [%s] for import of [%v] %s", i.Account, i.Subject, i.Type)
continue
}
acc, err := s.lookupAccount(i.Account)
if acc == nil || err != nil {
s.Errorf("Can't locate account [%s] for import of [%v] %s (err=%v)", i.Account, i.Subject, i.Type, err)
continue
}
switch i.Type {
case jwt.Stream:
Expand Down Expand Up @@ -1645,14 +1631,17 @@ func (s *Server) updateAccountClaims(a *Account, ac *jwt.AccountClaims) {

clients := gatherClients()
// Sort if we are over the limit.
if a.maxTotalConnectionsReached() {
if a.MaxTotalConnectionsReached() {
sort.Slice(clients, func(i, j int) bool {
return clients[i].start.After(clients[j].start)
})
}
now := time.Now().Unix()
for i, c := range clients {
if a.mconns != jwt.NoLimit && i >= int(a.mconns) {
a.mu.RLock()
exceeded := a.mconns != jwt.NoLimit && i >= int(a.mconns)
a.mu.RUnlock()
if exceeded {
c.maxAccountConnExceeded()
continue
}
Expand Down Expand Up @@ -1690,6 +1679,7 @@ func (s *Server) updateAccountClaims(a *Account, ac *jwt.AccountClaims) {
}

// Helper to build an internal account structure from a jwt.AccountClaims.
// Lock MUST NOT be held upon entry.
func (s *Server) buildInternalAccount(ac *jwt.AccountClaims) *Account {
acc := NewAccount(ac.Subject)
acc.Issuer = ac.Issuer
Expand Down
8 changes: 3 additions & 5 deletions server/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -572,9 +572,7 @@ func (s *Server) initEventTracking() {

// accountClaimUpdate will receive claim updates for accounts.
func (s *Server) accountClaimUpdate(sub *subscription, _ *client, subject, reply string, msg []byte) {
s.mu.Lock()
defer s.mu.Unlock()
if !s.eventsEnabled() {
if !s.EventsEnabled() {
return
}
toks := strings.Split(subject, tsep)
Expand Down Expand Up @@ -903,8 +901,8 @@ func (s *Server) accountDisconnectEvent(c *client, now time.Time, reason string)
RTT: c.getRTT(),
},
Sent: DataStats{
Msgs: c.inMsgs,
Bytes: c.inBytes,
Msgs: atomic.LoadInt64(&c.inMsgs),
Bytes: atomic.LoadInt64(&c.inBytes),
},
Received: DataStats{
Msgs: c.outMsgs,
Expand Down
8 changes: 3 additions & 5 deletions server/events_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -828,7 +828,7 @@ func TestSystemAccountConnectionLimitsServersStaggered(t *testing.T) {
}

// Restart server B.
optsB.AccountResolver = sa.accResolver
optsB.AccountResolver = sa.AccountResolver()
optsB.SystemAccount = sa.systemAccount().Name
sb = RunServer(optsB)
defer sb.Shutdown()
Expand Down Expand Up @@ -1409,10 +1409,8 @@ func TestFetchAccountRace(t *testing.T) {

// Replace B's account resolver with one that introduces
// delay during the Fetch()
sb.mu.Lock()
sac := &slowAccResolver{AccountResolver: sb.accResolver}
sb.accResolver = sac
sb.mu.Unlock()
sac := &slowAccResolver{AccountResolver: sb.AccountResolver()}
sb.SetAccountResolver(sac)

// Add the account in sa and sb
addAccountToMemResolver(sa, userAcc, jwt)
Expand Down
86 changes: 80 additions & 6 deletions server/jwt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,15 +46,11 @@ func opTrustBasicSetup() *Server {

func buildMemAccResolver(s *Server) {
mr := &MemAccResolver{}
s.mu.Lock()
s.accResolver = mr
s.mu.Unlock()
s.SetAccountResolver(mr)
}

func addAccountToMemResolver(s *Server, pub, jwtclaim string) {
s.mu.Lock()
s.accResolver.Store(pub, jwtclaim)
s.mu.Unlock()
s.AccountResolver().Store(pub, jwtclaim)
}

func createClient(t *testing.T, s *Server, akp nkeys.KeyPair) (*client, *bufio.Reader, string) {
Expand Down Expand Up @@ -2318,3 +2314,81 @@ func TestJWTCircularAccountServiceImport(t *testing.T) {
parseAsync("SUB foo 1\r\nPING\r\n")
expectPong(cr)
}

// This test ensures that connected clients are properly evicted
// (no deadlock) if the max conns of an account has been lowered
// and the account is being updated (following expiration during
// a lookup).
func TestJWTAccountLimitsMaxConnsAfterExpired(t *testing.T) {
s := opTrustBasicSetup()
defer s.Shutdown()
buildMemAccResolver(s)

okp, _ := nkeys.FromSeed(oSeed)

// Create accounts and imports/exports.
fooKP, _ := nkeys.CreateAccount()
fooPub, _ := fooKP.PublicKey()
fooAC := jwt.NewAccountClaims(fooPub)
fooAC.Limits.Conn = 10
fooJWT, err := fooAC.Encode(okp)
if err != nil {
t.Fatalf("Error generating account JWT: %v", err)
}
addAccountToMemResolver(s, fooPub, fooJWT)

newClient := func(expPre string) {
t.Helper()
// Create a client.
c, cr, cs := createClient(t, s, fooKP)
go c.parse([]byte(cs))
l, _ := cr.ReadString('\n')
if !strings.HasPrefix(l, expPre) {
t.Fatalf("Expected a response starting with %q, got %q", expPre, l)
}
go func() {
for {
if _, _, err := cr.ReadLine(); err != nil {
return
}
}
}()
}

for i := 0; i < 4; i++ {
newClient("PONG")
}

// We will simulate that the account has expired. When
// a new client will connect, the server will do a lookup
// and find the account expired, which then will cause
// a fetch and a rebuild of the account. Since max conns
// is now lower, some clients should have been removed.
acc, _ := s.LookupAccount(fooPub)
acc.mu.Lock()
acc.expired = true
acc.mu.Unlock()

// Now update with new expiration and max connections lowered to 2
fooAC.Limits.Conn = 2
fooJWT, err = fooAC.Encode(okp)
if err != nil {
t.Fatalf("Error generating account JWT: %v", err)
}
addAccountToMemResolver(s, fooPub, fooJWT)

// Cause the lookup that will detect that account was expired
// and rebuild it, and kick clients out.
newClient("-ERR ")

acc, _ = s.LookupAccount(fooPub)
checkFor(t, 2*time.Second, 15*time.Millisecond, func() error {
acc.mu.RLock()
numClients := len(acc.clients)
acc.mu.RUnlock()
if numClients != 2 {
return fmt.Errorf("Should have 2 clients, got %v", numClients)
}
return nil
})
}
5 changes: 4 additions & 1 deletion server/reload.go
Original file line number Diff line number Diff line change
Expand Up @@ -912,6 +912,8 @@ func (s *Server) reloadAuthorization() {
acc.mu.RLock()
accName := acc.Name
acc.mu.RUnlock()
// Release server lock for following actions
s.mu.Unlock()
accClaims, claimJWT, _ := s.fetchAccountClaims(accName)
if accClaims != nil {
err := s.updateAccountWithClaimJWT(acc, claimJWT)
Expand All @@ -923,9 +925,10 @@ func (s *Server) reloadAuthorization() {
s.Noticef("Reloaded: deleting account [removed]: %q", accName)
s.accounts.Delete(k)
}
// Regrab server lock.
s.mu.Lock()
return true
})

}
}

Expand Down
Loading