Skip to content

Commit b45eb1e

Browse files
committed
[ADDED] RTT in routez's route info
Added the RTT field to each route reported in routez. Ensure that when a route is accepted, we send a PING to compute the first RTT and don't have to wait for the ping timer to fire. Signed-off-by: Ivan Kozlovic <[email protected]>
1 parent 25e15d7 commit b45eb1e

8 files changed

+166
-29
lines changed

server/monitor.go

+2
Original file line numberDiff line numberDiff line change
@@ -558,6 +558,7 @@ type RouteInfo struct {
558558
Import *SubjectPermission `json:"import,omitempty"`
559559
Export *SubjectPermission `json:"export,omitempty"`
560560
Pending int `json:"pending_size"`
561+
RTT string `json:"rtt,omitempty"`
561562
InMsgs int64 `json:"in_msgs"`
562563
OutMsgs int64 `json:"out_msgs"`
563564
InBytes int64 `json:"in_bytes"`
@@ -600,6 +601,7 @@ func (s *Server) Routez(routezOpts *RoutezOptions) (*Routez, error) {
600601
NumSubs: uint32(len(r.subs)),
601602
Import: r.opts.Import,
602603
Export: r.opts.Export,
604+
RTT: r.getRTT(),
603605
}
604606

605607
if subs && len(r.subs) > 0 {

server/monitor_test.go

+36
Original file line numberDiff line numberDiff line change
@@ -3123,3 +3123,39 @@ func TestMonitorGatewayzAccounts(t *testing.T) {
31233123
return nil
31243124
})
31253125
}
3126+
3127+
func TestMonitorRouteRTT(t *testing.T) {
3128+
// Do not change default PingInterval and expect RTT
3129+
// to still be reported
3130+
3131+
ob := DefaultOptions()
3132+
sb := RunServer(ob)
3133+
defer sb.Shutdown()
3134+
3135+
oa := DefaultOptions()
3136+
oa.Routes = RoutesFromStr(fmt.Sprintf("nats://%s:%d", ob.Cluster.Host, ob.Cluster.Port))
3137+
sa := RunServer(oa)
3138+
defer sa.Shutdown()
3139+
3140+
checkClusterFormed(t, sa, sb)
3141+
3142+
checkRouteInfo := func(t *testing.T, s *Server) {
3143+
t.Helper()
3144+
routezURL := fmt.Sprintf("http://127.0.0.1:%d/routez", s.MonitorAddr().Port)
3145+
for pollMode := 0; pollMode < 2; pollMode++ {
3146+
checkFor(t, time.Second, 15*time.Millisecond, func() error {
3147+
rz := pollRoutez(t, s, pollMode, routezURL, nil)
3148+
if len(rz.Routes) != 1 {
3149+
return fmt.Errorf("Expected 1 route, got %v", len(rz.Routes))
3150+
}
3151+
ri := rz.Routes[0]
3152+
if ri.RTT == _EMPTY_ {
3153+
return fmt.Errorf("Route's RTT not reported")
3154+
}
3155+
return nil
3156+
})
3157+
}
3158+
}
3159+
checkRouteInfo(t, sa)
3160+
checkRouteInfo(t, sb)
3161+
}

server/route.go

+6
Original file line numberDiff line numberDiff line change
@@ -485,6 +485,12 @@ func (c *client) processRouteInfo(info *Info) {
485485
if !s.getOpts().Cluster.NoAdvertise {
486486
s.addClientConnectURLsAndSendINFOToClients(info.ClientConnectURLs)
487487
}
488+
489+
// This will allow us to determine the initial RTT without having to
490+
// wait for first timer based PING.
491+
c.mu.Lock()
492+
c.sendPing()
493+
c.mu.Unlock()
488494
} else {
489495
c.Debugf("Detected duplicate remote route %q", info.ID)
490496
c.closeConnection(DuplicateRoute)

server/routes_test.go

+64
Original file line numberDiff line numberDiff line change
@@ -1078,3 +1078,67 @@ func TestRouteNoCrashOnAddingSubToRoute(t *testing.T) {
10781078

10791079
waitCh(t, ch, "Did not get all messages")
10801080
}
1081+
1082+
func TestRouteRTT(t *testing.T) {
1083+
checkRTT := func(t *testing.T, s *Server, checkForUpdate bool) {
1084+
t.Helper()
1085+
var route *client
1086+
s.mu.Lock()
1087+
for _, r := range s.routes {
1088+
route = r
1089+
break
1090+
}
1091+
s.mu.Unlock()
1092+
1093+
prev := time.Duration(0)
1094+
check := func(t *testing.T) {
1095+
t.Helper()
1096+
checkFor(t, time.Second, 15*time.Millisecond, func() error {
1097+
route.mu.Lock()
1098+
rtt := route.rtt
1099+
route.mu.Unlock()
1100+
if rtt == 0 || rtt == prev {
1101+
return fmt.Errorf("RTT probably not tracked")
1102+
}
1103+
prev = rtt
1104+
return nil
1105+
})
1106+
}
1107+
check(t)
1108+
if checkForUpdate {
1109+
// Wait a bit and check that rtt is updated
1110+
time.Sleep(30 * time.Millisecond)
1111+
check(t)
1112+
}
1113+
}
1114+
1115+
ob := DefaultOptions()
1116+
ob.PingInterval = 15 * time.Millisecond
1117+
sb := RunServer(ob)
1118+
defer sb.Shutdown()
1119+
1120+
oa := DefaultOptions()
1121+
oa.PingInterval = 15 * time.Millisecond
1122+
oa.Routes = RoutesFromStr(fmt.Sprintf("nats://%s:%d", ob.Cluster.Host, ob.Cluster.Port))
1123+
sa := RunServer(oa)
1124+
defer sa.Shutdown()
1125+
1126+
checkClusterFormed(t, sa, sb)
1127+
checkRTT(t, sa, true)
1128+
checkRTT(t, sb, true)
1129+
1130+
sa.Shutdown()
1131+
sb.Shutdown()
1132+
// Now check that initial RTT is computed prior to first PingInterval
1133+
ob.PingInterval = time.Minute
1134+
sb = RunServer(ob)
1135+
defer sb.Shutdown()
1136+
1137+
oa.PingInterval = time.Minute
1138+
oa.Routes = RoutesFromStr(fmt.Sprintf("nats://%s:%d", ob.Cluster.Host, ob.Cluster.Port))
1139+
sa = RunServer(oa)
1140+
defer sa.Shutdown()
1141+
1142+
checkRTT(t, sa, false)
1143+
checkRTT(t, sb, false)
1144+
}

test/new_routes_test.go

+44-12
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
package test
1515

1616
import (
17+
"bytes"
1718
"context"
1819
"encoding/json"
1920
"fmt"
@@ -28,6 +29,14 @@ import (
2829
"github.com/nats-io/nuid"
2930
)
3031

32+
func routeSendInfo(b []byte, routeSend sendFun, routeExpect expectFun) {
33+
routeSend(fmt.Sprintf("INFO %s\r\n", b))
34+
// When a server receives an INFO from a route, it will send a PING
35+
// to compute the RTT.
36+
routeExpect(pingRe)
37+
routeSend("PONG\r\n")
38+
}
39+
3140
func runNewRouteServer(t *testing.T) (*server.Server, *server.Options) {
3241
return RunServerWithConfig("./configs/new_cluster.conf")
3342
}
@@ -55,6 +64,18 @@ func TestNewRouteInfoOnConnect(t *testing.T) {
5564
}
5665
}
5766

67+
func getRSubBuffer(t *testing.T, routeExpect expectFun) (bool, []byte) {
68+
t.Helper()
69+
buf := routeExpect(rsubRe)
70+
// See if PING has been read along with the RS+'s
71+
gotPing := false
72+
if idx := bytes.Index(buf, []byte("PING\r\n")); idx != -1 {
73+
buf = buf[:idx]
74+
gotPing = true
75+
}
76+
return gotPing, buf
77+
}
78+
5879
func TestNewRouteConnectSubs(t *testing.T) {
5980
s, opts := runNewRouteServer(t)
6081
defer s.Shutdown()
@@ -89,7 +110,7 @@ func TestNewRouteConnectSubs(t *testing.T) {
89110
}
90111
routeSend(fmt.Sprintf("INFO %s\r\n", b))
91112

92-
buf := routeExpect(rsubRe)
113+
gotPing, buf := getRSubBuffer(t, routeExpect)
93114

94115
matches := rsubRe.FindAllSubmatch(buf, -1)
95116
if len(matches) != 2 {
@@ -116,6 +137,11 @@ func TestNewRouteConnectSubs(t *testing.T) {
116137
}
117138
}
118139

140+
// Wait for PING if we did not get it yet.
141+
if !gotPing {
142+
routeExpect(pingRe)
143+
}
144+
119145
// Close the client connection, check the results.
120146
c.Close()
121147

@@ -163,7 +189,7 @@ func TestNewRouteConnectSubsWithAccount(t *testing.T) {
163189
}
164190
routeSend(fmt.Sprintf("INFO %s\r\n", b))
165191

166-
buf := routeExpect(rsubRe)
192+
gotPing, buf := getRSubBuffer(t, routeExpect)
167193

168194
matches := rsubRe.FindAllSubmatch(buf, -1)
169195
if len(matches) != 2 {
@@ -190,6 +216,11 @@ func TestNewRouteConnectSubsWithAccount(t *testing.T) {
190216
}
191217
}
192218

219+
// Wait for PING if we did not get it yet.
220+
if !gotPing {
221+
routeExpect(pingRe)
222+
}
223+
193224
// Close the client connection, check the results.
194225
c.Close()
195226

@@ -240,7 +271,8 @@ func TestNewRouteRSubs(t *testing.T) {
240271
if err != nil {
241272
t.Fatalf("Could not marshal test route info: %v", err)
242273
}
243-
routeSend(fmt.Sprintf("INFO %s\r\nPING\r\n", b))
274+
routeSendInfo(b, routeSend, routeExpect)
275+
routeSend("PING\r\n")
244276
routeExpect(pongRe)
245277

246278
// Have the client listen on foo.
@@ -317,7 +349,7 @@ func TestNewRouteProgressiveNormalSubs(t *testing.T) {
317349
if err != nil {
318350
t.Fatalf("Could not marshal test route info: %v", err)
319351
}
320-
routeSend(fmt.Sprintf("INFO %s\r\n", b))
352+
routeSendInfo(b, routeSend, routeExpect)
321353
routeSend("PING\r\n")
322354
routeExpect(pongRe)
323355

@@ -414,8 +446,7 @@ func TestNewRouteClientClosedWithNormalSubscriptions(t *testing.T) {
414446
if err != nil {
415447
t.Fatalf("Could not marshal test route info: %v", err)
416448
}
417-
routeSend(fmt.Sprintf("INFO %s\r\n", b))
418-
449+
routeSendInfo(b, routeSend, routeExpect)
419450
routeSend("PING\r\n")
420451
routeExpect(pongRe)
421452

@@ -464,8 +495,7 @@ func TestNewRouteClientClosedWithQueueSubscriptions(t *testing.T) {
464495
if err != nil {
465496
t.Fatalf("Could not marshal test route info: %v", err)
466497
}
467-
routeSend(fmt.Sprintf("INFO %s\r\n", b))
468-
498+
routeSendInfo(b, routeSend, routeExpect)
469499
routeSend("PING\r\n")
470500
routeExpect(pongRe)
471501

@@ -512,7 +542,7 @@ func TestNewRouteRUnsubAccountSpecific(t *testing.T) {
512542
if err != nil {
513543
t.Fatalf("Could not marshal test route info: %v", err)
514544
}
515-
routeSend(fmt.Sprintf("INFO %s\r\n", b))
545+
routeSendInfo(b, routeSend, routeExpect)
516546

517547
// Now create 500 subs on same subject but all different accounts.
518548
for i := 0; i < 500; i++ {
@@ -568,7 +598,7 @@ func TestNewRouteRSubCleanupOnDisconnect(t *testing.T) {
568598
if err != nil {
569599
t.Fatalf("Could not marshal test route info: %v", err)
570600
}
571-
routeSend(fmt.Sprintf("INFO %s\r\n", b))
601+
routeSendInfo(b, routeSend, routeExpect)
572602

573603
// Now create 100 subs on 3 different accounts.
574604
for i := 0; i < 100; i++ {
@@ -606,7 +636,8 @@ func TestNewRouteSendSubsAndMsgs(t *testing.T) {
606636
if err != nil {
607637
t.Fatalf("Could not marshal test route info: %v", err)
608638
}
609-
routeSend(fmt.Sprintf("INFO %s\r\nPING\r\n", b))
639+
routeSendInfo(b, routeSend, routeExpect)
640+
routeSend("PING\r\n")
610641
routeExpect(pongRe)
611642

612643
// Now let's send in interest from the new protocol.
@@ -733,7 +764,8 @@ func TestNewRouteProcessRoutedMsgs(t *testing.T) {
733764
if err != nil {
734765
t.Fatalf("Could not marshal test route info: %v", err)
735766
}
736-
routeSend(fmt.Sprintf("INFO %s\r\nPING\r\n", b))
767+
routeSendInfo(b, routeSend, routeExpect)
768+
routeSend("PING\r\n")
737769
routeExpect(pongRe)
738770

739771
// Create a client

test/norace_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -359,7 +359,7 @@ func TestQueueSubWeightOrderMultipleConnections(t *testing.T) {
359359
if err != nil {
360360
t.Fatalf("Could not marshal test route info: %v", err)
361361
}
362-
routeSend(fmt.Sprintf("INFO %s\r\n", b))
362+
routeSendInfo(b, routeSend, routeExpect)
363363

364364
start := make(chan bool)
365365
for _, nc := range clients {

test/route_discovery_test.go

+6-8
Original file line numberDiff line numberDiff line change
@@ -74,8 +74,7 @@ func TestSeedMultipleRouteInfo(t *testing.T) {
7474
// register ourselves via INFO
7575
r1Info := server.Info{ID: rc1ID, Host: rc1Host, Port: rc1Port}
7676
b, _ := json.Marshal(r1Info)
77-
infoJSON := fmt.Sprintf(server.InfoProto, b)
78-
routeSend1(infoJSON)
77+
routeSendInfo(b, routeSend1, route1Expect)
7978
routeSend1("PING\r\n")
8079
route1Expect(pongRe)
8180

@@ -93,7 +92,7 @@ func TestSeedMultipleRouteInfo(t *testing.T) {
9392
// register ourselves via INFO
9493
r2Info := server.Info{ID: rc2ID, Host: rc2Host, Port: rc2Port}
9594
b, _ = json.Marshal(r2Info)
96-
infoJSON = fmt.Sprintf(server.InfoProto, b)
95+
infoJSON := fmt.Sprintf(server.InfoProto, b)
9796
routeSend2(infoJSON)
9897

9998
// Now read back the second INFO route1 should receive letting
@@ -556,8 +555,7 @@ func TestSeedReturnIPInInfo(t *testing.T) {
556555
// register ourselves via INFO
557556
r1Info := server.Info{ID: rc1ID, Host: rc1Host, Port: rc1Port}
558557
b, _ := json.Marshal(r1Info)
559-
infoJSON := fmt.Sprintf(server.InfoProto, b)
560-
routeSend1(infoJSON)
558+
routeSendInfo(b, routeSend1, route1Expect)
561559
routeSend1("PING\r\n")
562560
route1Expect(pongRe)
563561

@@ -573,7 +571,7 @@ func TestSeedReturnIPInInfo(t *testing.T) {
573571
// register ourselves via INFO
574572
r2Info := server.Info{ID: rc2ID, Host: rc2Host, Port: rc2Port}
575573
b, _ = json.Marshal(r2Info)
576-
infoJSON = fmt.Sprintf(server.InfoProto, b)
574+
infoJSON := fmt.Sprintf(server.InfoProto, b)
577575
routeSend2(infoJSON)
578576

579577
// Now read info that route1 should have received from the seed
@@ -621,8 +619,7 @@ func TestImplicitRouteRetry(t *testing.T) {
621619
// register ourselves via INFO
622620
rbInfo := server.Info{ID: rcbID, Host: optsB.Cluster.Host, Port: optsB.Cluster.Port}
623621
b, _ := json.Marshal(rbInfo)
624-
infoJSON := fmt.Sprintf(server.InfoProto, b)
625-
routeBSend(infoJSON)
622+
routeSendInfo(b, routeBSend, routeBExpect)
626623
routeBSend("PING\r\n")
627624
routeBExpect(pongRe)
628625

@@ -634,6 +631,7 @@ func TestImplicitRouteRetry(t *testing.T) {
634631
if err != nil {
635632
t.Fatalf("Error during listen: %v", err)
636633
}
634+
defer rbListen.Close()
637635
c, err := rbListen.Accept()
638636
if err != nil {
639637
t.Fatalf("Error during accept: %v", err)

0 commit comments

Comments
 (0)