Skip to content

Commit 461ab96

Browse files
committed
Additional fix to #631
This is the result of flapping tests in go-nats that were caused by a defect (see PR https://github.com/nats-io/go-nats/pull/348). However, during debugging, I realize that there were also things that were not quite right in the server side. This change should make it the notification of cluster topology changes to clients more robust.
1 parent 33db08e commit 461ab96

File tree

4 files changed

+195
-120
lines changed

4 files changed

+195
-120
lines changed

server/client.go

+41-28
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,6 @@ type clientFlag byte
5555
const (
5656
connectReceived clientFlag = 1 << iota // The CONNECT proto has been received
5757
firstPongSent // The first PONG has been sent
58-
infoUpdated // The server's Info object has changed before first PONG was sent
5958
handshakeComplete // For TLS clients, indicate that the handshake is complete
6059
)
6160

@@ -80,10 +79,12 @@ func (cf *clientFlag) setIfNotSet(c clientFlag) bool {
8079
return false
8180
}
8281

82+
// Commenting out for now otherwise megacheck complains.
83+
// We may need that in the future.
8384
// clear unset the flag (would be equivalent to set the boolean to false)
84-
func (cf *clientFlag) clear(c clientFlag) {
85-
*cf &= ^c
86-
}
85+
// func (cf *clientFlag) clear(c clientFlag) {
86+
// *cf &= ^c
87+
// }
8788

8889
type client struct {
8990
// Here first because of use of atomics, and memory alignment.
@@ -579,37 +580,50 @@ func (c *client) processPing() {
579580
return
580581
}
581582
c.traceOutOp("PONG", nil)
582-
err := c.sendProto([]byte("PONG\r\n"), true)
583-
if err != nil {
583+
if err := c.sendProto([]byte("PONG\r\n"), true); err != nil {
584584
c.clearConnection()
585585
c.Debugf("Error on Flush, error %s", err.Error())
586+
c.mu.Unlock()
587+
return
586588
}
587-
srv := c.srv
588-
sendUpdateINFO := false
589-
// Check if this is the first PONG, if so...
590-
if c.flags.setIfNotSet(firstPongSent) {
591-
// Check if server should send an async INFO protocol to the client
592-
if c.opts.Protocol >= ClientProtoInfo &&
593-
srv != nil && c.flags.isSet(infoUpdated) {
594-
sendUpdateINFO = true
595-
}
596-
// We can now clear the flag
597-
c.flags.clear(infoUpdated)
589+
// The CONNECT should have been received, but make sure it
590+
// is so before proceeding
591+
if !c.flags.isSet(connectReceived) {
592+
c.mu.Unlock()
593+
return
594+
}
595+
// If we are here, the CONNECT has been received so we know
596+
// if this client supports async INFO or not.
597+
var (
598+
checkClusterChange bool
599+
srv = c.srv
600+
)
601+
// For older clients, just flip the firstPongSent flag if not already
602+
// set and we are done.
603+
if c.opts.Protocol < ClientProtoInfo || srv == nil {
604+
c.flags.setIfNotSet(firstPongSent)
605+
} else {
606+
// This is a client that supports async INFO protocols.
607+
// If this is the first PING (so firstPongSent is not set yet),
608+
// we will need to check if there was a change in cluster topology.
609+
checkClusterChange = !c.flags.isSet(firstPongSent)
598610
}
599611
c.mu.Unlock()
600612

601-
// Some clients send an initial PING as part of the synchronous connect process.
602-
// They can't be receiving anything until the first PONG is received.
603-
// So we delay the possible updated INFO after this point.
604-
if sendUpdateINFO {
613+
if checkClusterChange {
605614
srv.mu.Lock()
606-
// Use the cached protocol
607-
proto := srv.infoJSON
608-
srv.mu.Unlock()
609-
610615
c.mu.Lock()
611-
c.sendInfo(proto)
616+
// Now that we are under both locks, we can flip the flag.
617+
// This prevents sendAsyncInfoToClients() and and code here
618+
// to send a double INFO protocol.
619+
c.flags.set(firstPongSent)
620+
// If there was a cluster update since this client was created,
621+
// send an updated INFO protocol now.
622+
if srv.lastCURLsUpdate >= c.start.UnixNano() {
623+
c.sendInfo(srv.infoJSON)
624+
}
612625
c.mu.Unlock()
626+
srv.mu.Unlock()
613627
}
614628
}
615629

@@ -1344,8 +1358,7 @@ func (c *client) closeConnection() {
13441358
// Unless disabled, possibly update the server's INFO protcol
13451359
// and send to clients that know how to handle async INFOs.
13461360
if !srv.getOpts().Cluster.NoAdvertise {
1347-
srv.removeClientConnectURLs(connectURLs)
1348-
srv.sendAsyncInfoToClients()
1361+
srv.removeClientConnectURLsAndSendINFOToClients(connectURLs)
13491362
}
13501363
}
13511364

server/route.go

+10-33
Original file line numberDiff line numberDiff line change
@@ -168,8 +168,7 @@ func (c *client) processRouteInfo(info *Info) {
168168
// Unless disabled, possibly update the server's INFO protocol
169169
// and send to clients that know how to handle async INFOs.
170170
if !s.getOpts().Cluster.NoAdvertise {
171-
s.addClientConnectURLs(info.ClientConnectURLs)
172-
s.sendAsyncInfoToClients()
171+
s.addClientConnectURLsAndSendINFOToClients(info.ClientConnectURLs)
173172
}
174173
} else {
175174
c.Debugf("Detected duplicate remote route %q", info.ID)
@@ -179,46 +178,24 @@ func (c *client) processRouteInfo(info *Info) {
179178

180179
// sendAsyncInfoToClients sends an INFO protocol to all
181180
// connected clients that accept async INFO updates.
181+
// The server lock is held on entry.
182182
func (s *Server) sendAsyncInfoToClients() {
183-
s.mu.Lock()
184183
// If there are no clients supporting async INFO protocols, we are done.
185184
// Also don't send if we are shutting down...
186185
if s.cproto == 0 || s.shutdown {
187-
s.mu.Unlock()
188186
return
189187
}
190188

191-
// Capture under lock
192-
proto := s.infoJSON
193-
194-
// Make a copy of ALL clients so we can release server lock while
195-
// sending the protocol to clients. We could check the conditions
196-
// (proto support, first PONG sent) here and so have potentially
197-
// a limited number of clients, but that would mean grabbing the
198-
// client's lock here, which we don't want since we would still
199-
// need it in the second loop.
200-
clients := make([]*client, 0, len(s.clients))
201189
for _, c := range s.clients {
202-
clients = append(clients, c)
203-
}
204-
s.mu.Unlock()
205-
206-
for _, c := range clients {
207190
c.mu.Lock()
208-
// If server did not yet receive the CONNECT protocol, check later
209-
// when sending the first PONG.
210-
if !c.flags.isSet(connectReceived) {
211-
c.flags.set(infoUpdated)
212-
} else if c.opts.Protocol >= ClientProtoInfo {
213-
// Send only if first PONG was sent
214-
if c.flags.isSet(firstPongSent) {
215-
// sendInfo takes care of checking if the connection is still
216-
// valid or not, so don't duplicate tests here.
217-
c.sendInfo(proto)
218-
} else {
219-
// Otherwise, notify that INFO has changed and check later.
220-
c.flags.set(infoUpdated)
221-
}
191+
// Here, we are going to send only to the clients that are fully
192+
// registered (server has received CONNECT and first PING). For
193+
// clients that are not at this stage, this will happen in the
194+
// processing of the first PING (see client.processPing)
195+
if c.opts.Protocol >= ClientProtoInfo && c.flags.isSet(firstPongSent) {
196+
// sendInfo takes care of checking if the connection is still
197+
// valid or not, so don't duplicate tests here.
198+
c.sendInfo(s.infoJSON)
222199
}
223200
c.mu.Unlock()
224201
}

server/routes_test.go

+123-48
Original file line numberDiff line numberDiff line change
@@ -729,57 +729,132 @@ func TestRoutesToEachOther(t *testing.T) {
729729
}
730730
}
731731

732-
func TestConnectULRsWithRoutesToEachOther(t *testing.T) {
733-
optsA := DefaultOptions()
734-
optsA.Host = "127.0.0.1"
735-
optsA.Cluster.Port = 7246
736-
optsA.Routes = RoutesFromStr("nats://127.0.0.1:7247")
737-
738-
optsB := DefaultOptions()
739-
optsB.Host = "127.0.0.1"
740-
optsB.Cluster.Port = 7247
741-
optsB.Routes = RoutesFromStr("nats://127.0.0.1:7246")
742-
743-
// Start servers with go routines to increase change of
744-
// each server connecting to each other at the same time.
745-
srvA := New(optsA)
746-
defer srvA.Shutdown()
747-
748-
srvB := New(optsB)
749-
defer srvB.Shutdown()
750-
751-
go srvA.Start()
752-
go srvB.Start()
753-
754-
// Wait for cluster to be formed
755-
checkClusterFormed(t, srvA, srvB)
756-
757-
// Connect to serverB
758-
url := fmt.Sprintf("nats://%s", srvB.Addr().String())
759-
nc, err := nats.Connect(url)
760-
if err != nil {
761-
t.Fatalf("Error on connect: %v", err)
762-
}
763-
defer nc.Close()
764-
ds := nc.Servers()
765-
if len(ds) != 2 {
766-
t.Fatalf("Expected 2 servers, got %v", ds)
732+
func wait(ch chan bool) error {
733+
select {
734+
case <-ch:
735+
return nil
736+
case <-time.After(5 * time.Second):
767737
}
738+
return fmt.Errorf("timeout")
739+
}
768740

769-
// Shutdown server A and make sure that we are notfied
770-
// that server A is no longer running.
771-
srvA.Shutdown()
772-
timeout := time.Now().Add(5 * time.Second)
773-
ok := false
774-
for time.Now().Before(timeout) {
775-
ds = nc.Servers()
776-
if len(ds) == 1 {
777-
ok = true
778-
break
741+
func TestServerPoolUpdatedWhenRouteGoesAway(t *testing.T) {
742+
s1Opts := DefaultOptions()
743+
s1Opts.Host = "127.0.0.1"
744+
s1Opts.Port = 4222
745+
s1Opts.Cluster.Host = "127.0.0.1"
746+
s1Opts.Cluster.Port = 6222
747+
s1Opts.Routes = RoutesFromStr("nats://127.0.0.1:6223,nats://127.0.0.1:6224")
748+
s1 := RunServer(s1Opts)
749+
defer s1.Shutdown()
750+
751+
s1Url := "nats://127.0.0.1:4222"
752+
s2Url := "nats://127.0.0.1:4223"
753+
s3Url := "nats://127.0.0.1:4224"
754+
755+
ch := make(chan bool, 1)
756+
chch := make(chan bool, 1)
757+
connHandler := func(_ *nats.Conn) {
758+
chch <- true
759+
}
760+
nc, err := nats.Connect(s1Url,
761+
nats.ReconnectHandler(connHandler),
762+
nats.DiscoveredServersHandler(func(_ *nats.Conn) {
763+
ch <- true
764+
}))
765+
if err != nil {
766+
t.Fatalf("Error on connect")
767+
}
768+
769+
s2Opts := DefaultOptions()
770+
s2Opts.Host = "127.0.0.1"
771+
s2Opts.Port = s1Opts.Port + 1
772+
s2Opts.Cluster.Host = "127.0.0.1"
773+
s2Opts.Cluster.Port = 6223
774+
s2Opts.Routes = RoutesFromStr("nats://127.0.0.1:6222,nats://127.0.0.1:6224")
775+
s2 := RunServer(s2Opts)
776+
defer s2.Shutdown()
777+
778+
// Wait to be notified
779+
if err := wait(ch); err != nil {
780+
t.Fatal("New server callback was not invoked")
781+
}
782+
783+
checkPool := func(expected []string) {
784+
// Don't use discovered here, but Servers to have the full list.
785+
// Also, there may be cases where the mesh is not formed yet,
786+
// so try again on failure.
787+
var (
788+
ds []string
789+
timeout = time.Now().Add(5 * time.Second)
790+
)
791+
for time.Now().Before(timeout) {
792+
ds = nc.Servers()
793+
if len(ds) == len(expected) {
794+
m := make(map[string]struct{}, len(ds))
795+
for _, url := range ds {
796+
m[url] = struct{}{}
797+
}
798+
ok := true
799+
for _, url := range expected {
800+
if _, present := m[url]; !present {
801+
ok = false
802+
break
803+
}
804+
}
805+
if ok {
806+
return
807+
}
808+
}
809+
time.Sleep(50 * time.Millisecond)
779810
}
780-
time.Sleep(50 * time.Millisecond)
811+
stackFatalf(t, "Expected %v, got %v", expected, ds)
812+
}
813+
// Verify that we now know about s2
814+
checkPool([]string{s1Url, s2Url})
815+
816+
s3Opts := DefaultOptions()
817+
s3Opts.Host = "127.0.0.1"
818+
s3Opts.Port = s2Opts.Port + 1
819+
s3Opts.Cluster.Host = "127.0.0.1"
820+
s3Opts.Cluster.Port = 6224
821+
s3Opts.Routes = RoutesFromStr("nats://127.0.0.1:6222,nats://127.0.0.1:6223")
822+
s3 := RunServer(s3Opts)
823+
defer s3.Shutdown()
824+
825+
// Wait to be notified
826+
if err := wait(ch); err != nil {
827+
t.Fatal("New server callback was not invoked")
828+
}
829+
// Verify that we now know about s3
830+
checkPool([]string{s1Url, s2Url, s3Url})
831+
832+
// Stop s1. Since this was passed to the Connect() call, this one should
833+
// still be present.
834+
s1.Shutdown()
835+
// Wait for reconnect
836+
if err := wait(chch); err != nil {
837+
t.Fatal("Reconnect handler not invoked")
838+
}
839+
checkPool([]string{s1Url, s2Url, s3Url})
840+
841+
// Check the server we reconnected to.
842+
reConnectedTo := nc.ConnectedUrl()
843+
expected := []string{s1Url}
844+
if reConnectedTo == s2Url {
845+
s2.Shutdown()
846+
expected = append(expected, s3Url)
847+
} else if reConnectedTo == s3Url {
848+
s3.Shutdown()
849+
expected = append(expected, s2Url)
850+
} else {
851+
t.Fatalf("Unexpected server client has reconnected to: %v", reConnectedTo)
781852
}
782-
if !ok {
783-
t.Fatalf("List of servers should be only 1, got %v", ds)
853+
// Wait for reconnect
854+
if err := wait(chch); err != nil {
855+
t.Fatal("Reconnect handler not invoked")
784856
}
857+
// The implicit server that we just shutdown should have been removed from the pool
858+
checkPool(expected)
859+
nc.Close()
785860
}

0 commit comments

Comments
 (0)