Skip to content

Commit 8a0120d

Browse files
authored
Merge pull request #1108 from nats-io/add_leafz
[ADDED] /leafz endpoint
2 parents 6c4a88f + cd4b8d3 commit 8a0120d

File tree

3 files changed

+336
-0
lines changed

3 files changed

+336
-0
lines changed

server/monitor.go

+106
Original file line numberDiff line numberDiff line change
@@ -982,6 +982,7 @@ func (s *Server) HandleRoot(w http.ResponseWriter, r *http.Request) {
982982
<a href=/connz>connz</a><br/>
983983
<a href=/routez>routez</a><br/>
984984
<a href=/gatewayz>gatewayz</a><br/>
985+
<a href=/leafz>leafz</a><br/>
985986
<a href=/subsz>subsz</a><br/>
986987
<br/>
987988
<a href=https://nats-io.github.io/docs/nats_server/monitoring.html>help</a>
@@ -1534,6 +1535,111 @@ func (s *Server) HandleGatewayz(w http.ResponseWriter, r *http.Request) {
15341535
ResponseHandler(w, r, b)
15351536
}
15361537

1538+
// Leafz represents detailed information on Leafnodes.
1539+
type Leafz struct {
1540+
ID string `json:"server_id"`
1541+
Now time.Time `json:"now"`
1542+
NumLeafs int `json:"leafnodes"`
1543+
Leafs []*LeafInfo `json:"leafs"`
1544+
}
1545+
1546+
// LeafzOptions are options passed to Leafz
1547+
type LeafzOptions struct {
1548+
// Subscriptions indicates that Leafz will return a leafnode's subscriptions
1549+
Subscriptions bool `json:"subscriptions"`
1550+
}
1551+
1552+
// LeafInfo has detailed information on each remote leafnode connection.
1553+
type LeafInfo struct {
1554+
Account string `json:"account"`
1555+
IP string `json:"ip"`
1556+
Port int `json:"port"`
1557+
RTT string `json:"rtt,omitempty"`
1558+
InMsgs int64 `json:"in_msgs"`
1559+
OutMsgs int64 `json:"out_msgs"`
1560+
InBytes int64 `json:"in_bytes"`
1561+
OutBytes int64 `json:"out_bytes"`
1562+
NumSubs uint32 `json:"subscriptions"`
1563+
Subs []string `json:"subscriptions_list,omitempty"`
1564+
}
1565+
1566+
// Leafz returns a Leafz structure containing information about leafnodes.
1567+
func (s *Server) Leafz(opts *LeafzOptions) (*Leafz, error) {
1568+
// Grab leafnodes
1569+
var lconns []*client
1570+
s.mu.Lock()
1571+
if len(s.leafs) > 0 {
1572+
lconns = make([]*client, 0, len(s.leafs))
1573+
for _, ln := range s.leafs {
1574+
lconns = append(lconns, ln)
1575+
}
1576+
}
1577+
s.mu.Unlock()
1578+
1579+
var leafnodes []*LeafInfo
1580+
if len(lconns) > 0 {
1581+
leafnodes = make([]*LeafInfo, 0, len(lconns))
1582+
for _, ln := range lconns {
1583+
ln.mu.Lock()
1584+
lni := &LeafInfo{
1585+
Account: ln.acc.Name,
1586+
IP: ln.host,
1587+
Port: int(ln.port),
1588+
RTT: ln.getRTT(),
1589+
InMsgs: atomic.LoadInt64(&ln.inMsgs),
1590+
OutMsgs: ln.outMsgs,
1591+
InBytes: atomic.LoadInt64(&ln.inBytes),
1592+
OutBytes: ln.outBytes,
1593+
NumSubs: uint32(len(ln.subs)),
1594+
}
1595+
if opts != nil && opts.Subscriptions {
1596+
lni.Subs = make([]string, 0, len(ln.subs))
1597+
for _, sub := range ln.subs {
1598+
lni.Subs = append(lni.Subs, string(sub.subject))
1599+
}
1600+
}
1601+
ln.mu.Unlock()
1602+
leafnodes = append(leafnodes, lni)
1603+
}
1604+
}
1605+
return &Leafz{
1606+
ID: s.ID(),
1607+
Now: time.Now(),
1608+
NumLeafs: len(leafnodes),
1609+
Leafs: leafnodes,
1610+
}, nil
1611+
}
1612+
1613+
// HandleLeafz process HTTP requests for leafnode information.
1614+
func (s *Server) HandleLeafz(w http.ResponseWriter, r *http.Request) {
1615+
s.mu.Lock()
1616+
s.httpReqStats[LeafzPath]++
1617+
s.mu.Unlock()
1618+
1619+
subs, err := decodeBool(w, r, "subs")
1620+
if err != nil {
1621+
return
1622+
}
1623+
var opts *LeafzOptions
1624+
if subs {
1625+
opts = &LeafzOptions{Subscriptions: true}
1626+
}
1627+
1628+
l, err := s.Leafz(opts)
1629+
if err != nil {
1630+
w.WriteHeader(http.StatusBadRequest)
1631+
w.Write([]byte(err.Error()))
1632+
return
1633+
}
1634+
b, err := json.MarshalIndent(l, "", " ")
1635+
if err != nil {
1636+
s.Errorf("Error marshaling response to /leafz request: %v", err)
1637+
}
1638+
1639+
// Handle response
1640+
ResponseHandler(w, r, b)
1641+
}
1642+
15371643
// ResponseHandler handles responses for monitoring routes
15381644
func ResponseHandler(w http.ResponseWriter, r *http.Request, data []byte) {
15391645
// Get callback from request

server/monitor_test.go

+227
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,9 @@ import (
3232
"time"
3333
"unicode"
3434

35+
"github.com/nats-io/jwt"
3536
"github.com/nats-io/nats.go"
37+
"github.com/nats-io/nkeys"
3638
)
3739

3840
const CLIENT_PORT = -1
@@ -3160,3 +3162,228 @@ func TestMonitorRouteRTT(t *testing.T) {
31603162
checkRouteInfo(t, sa)
31613163
checkRouteInfo(t, sb)
31623164
}
3165+
3166+
func pollLeafz(t *testing.T, s *Server, mode int, url string, opts *LeafzOptions) *Leafz {
3167+
t.Helper()
3168+
if mode == 0 {
3169+
l := &Leafz{}
3170+
body := readBody(t, url)
3171+
if err := json.Unmarshal(body, l); err != nil {
3172+
t.Fatalf("Got an error unmarshalling the body: %v\n", err)
3173+
}
3174+
return l
3175+
}
3176+
l, err := s.Leafz(opts)
3177+
if err != nil {
3178+
t.Fatalf("Error on Leafz: %v", err)
3179+
}
3180+
return l
3181+
}
3182+
3183+
func TestMonitorLeafz(t *testing.T) {
3184+
content := `
3185+
listen: "127.0.0.1:-1"
3186+
http: "127.0.0.1:-1"
3187+
operator = "../test/configs/nkeys/op.jwt"
3188+
resolver = MEMORY
3189+
ping_interval = 1
3190+
leafnodes {
3191+
listen: "127.0.0.1:-1"
3192+
}
3193+
`
3194+
conf := createConfFile(t, []byte(content))
3195+
defer os.Remove(conf)
3196+
sb, ob := RunServerWithConfig(conf)
3197+
defer sb.Shutdown()
3198+
3199+
createAcc := func(t *testing.T) (*Account, string) {
3200+
t.Helper()
3201+
acc, akp := createAccount(sb)
3202+
kp, _ := nkeys.CreateUser()
3203+
pub, _ := kp.PublicKey()
3204+
nuc := jwt.NewUserClaims(pub)
3205+
ujwt, err := nuc.Encode(akp)
3206+
if err != nil {
3207+
t.Fatalf("Error generating user JWT: %v", err)
3208+
}
3209+
seed, _ := kp.Seed()
3210+
creds := genCredsFile(t, ujwt, seed)
3211+
return acc, creds
3212+
}
3213+
acc1, mycreds1 := createAcc(t)
3214+
defer os.Remove(mycreds1)
3215+
acc2, mycreds2 := createAcc(t)
3216+
defer os.Remove(mycreds2)
3217+
3218+
content = `
3219+
port: -1
3220+
http: "127.0.0.1:-1"
3221+
ping_interval = 1
3222+
accounts {
3223+
%s {
3224+
users [
3225+
{user: user1, password: pwd}
3226+
]
3227+
}
3228+
%s {
3229+
users [
3230+
{user: user2, password: pwd}
3231+
]
3232+
}
3233+
}
3234+
leafnodes {
3235+
remotes = [
3236+
{
3237+
account: "%s"
3238+
url: nats-leaf://127.0.0.1:%d
3239+
credentials: "%s"
3240+
}
3241+
{
3242+
account: "%s"
3243+
url: nats-leaf://127.0.0.1:%d
3244+
credentials: "%s"
3245+
}
3246+
]
3247+
}
3248+
`
3249+
config := fmt.Sprintf(content,
3250+
acc1.Name, acc2.Name,
3251+
acc1.Name, ob.LeafNode.Port, mycreds1,
3252+
acc2.Name, ob.LeafNode.Port, mycreds2)
3253+
conf = createConfFile(t, []byte(config))
3254+
defer os.Remove(conf)
3255+
sa, oa := RunServerWithConfig(conf)
3256+
defer sa.Shutdown()
3257+
3258+
checkFor(t, time.Second, 15*time.Millisecond, func() error {
3259+
if n := sa.NumLeafNodes(); n != 2 {
3260+
return fmt.Errorf("Expected 2 leaf connections, got %v", n)
3261+
}
3262+
return nil
3263+
})
3264+
3265+
// Wait for initial RTT to be computed
3266+
time.Sleep(firstPingInterval + 500*time.Millisecond)
3267+
3268+
ch := make(chan bool, 1)
3269+
nc1B := natsConnect(t, fmt.Sprintf("nats://127.0.0.1:%d", ob.Port), nats.UserCredentials(mycreds1))
3270+
defer nc1B.Close()
3271+
natsSub(t, nc1B, "foo", func(_ *nats.Msg) { ch <- true })
3272+
natsSub(t, nc1B, "bar", func(_ *nats.Msg) {})
3273+
natsFlush(t, nc1B)
3274+
3275+
nc2B := natsConnect(t, fmt.Sprintf("nats://127.0.0.1:%d", ob.Port), nats.UserCredentials(mycreds2))
3276+
defer nc2B.Close()
3277+
natsSub(t, nc2B, "bar", func(_ *nats.Msg) { ch <- true })
3278+
natsSub(t, nc2B, "foo", func(_ *nats.Msg) {})
3279+
natsFlush(t, nc2B)
3280+
3281+
nc1A := natsConnect(t, fmt.Sprintf("nats://user1:[email protected]:%d", oa.Port))
3282+
defer nc1A.Close()
3283+
natsPub(t, nc1A, "foo", []byte("hello"))
3284+
natsFlush(t, nc1A)
3285+
3286+
waitCh(t, ch, "Did not get the message")
3287+
3288+
nc2A := natsConnect(t, fmt.Sprintf("nats://user2:[email protected]:%d", oa.Port))
3289+
defer nc2A.Close()
3290+
natsPub(t, nc2A, "bar", []byte("hello"))
3291+
natsPub(t, nc2A, "bar", []byte("hello"))
3292+
natsFlush(t, nc2A)
3293+
3294+
waitCh(t, ch, "Did not get the message")
3295+
waitCh(t, ch, "Did not get the message")
3296+
3297+
// Let's poll server A
3298+
pollURL := fmt.Sprintf("http://127.0.0.1:%d/leafz?subs=1", sa.MonitorAddr().Port)
3299+
for pollMode := 1; pollMode < 2; pollMode++ {
3300+
l := pollLeafz(t, sa, pollMode, pollURL, &LeafzOptions{Subscriptions: true})
3301+
if l.ID != sa.ID() {
3302+
t.Fatalf("Expected ID to be %q, got %q", sa.ID(), l.ID)
3303+
}
3304+
if l.Now.IsZero() {
3305+
t.Fatalf("Expected Now to be set, was not")
3306+
}
3307+
if l.NumLeafs != 2 {
3308+
t.Fatalf("Expected NumLeafs to be 2, got %v", l.NumLeafs)
3309+
}
3310+
if len(l.Leafs) != 2 {
3311+
t.Fatalf("Expected array to be len 2, got %v", len(l.Leafs))
3312+
}
3313+
for _, ln := range l.Leafs {
3314+
if ln.Account == acc1.Name {
3315+
if ln.OutMsgs != 1 || ln.OutBytes == 0 || ln.InMsgs != 0 || ln.InBytes != 0 {
3316+
t.Fatalf("Expected 1 OutMsgs/Bytes and 0 InMsgs/Bytes, got %+v", ln)
3317+
}
3318+
} else if ln.Account == acc2.Name {
3319+
if ln.OutMsgs != 2 || ln.OutBytes == 0 || ln.InMsgs != 0 || ln.InBytes != 0 {
3320+
t.Fatalf("Expected 2 OutMsgs/Bytes and 0 InMsgs/Bytes, got %+v", ln)
3321+
}
3322+
} else {
3323+
t.Fatalf("Expected account to be %q or %q, got %q", acc1.Name, acc2.Name, ln.Account)
3324+
}
3325+
if ln.RTT == "" {
3326+
t.Fatalf("RTT not tracked?")
3327+
}
3328+
if ln.NumSubs != 2 {
3329+
t.Fatalf("Expected 2 subs, got %v", ln.NumSubs)
3330+
}
3331+
if len(ln.Subs) != 2 {
3332+
t.Fatalf("Expected subs to be returned, got %v", len(ln.Subs))
3333+
}
3334+
if (ln.Subs[0] != "foo" || ln.Subs[1] != "bar") && (ln.Subs[0] != "bar" || ln.Subs[1] != "foo") {
3335+
t.Fatalf("Unexpected subjects: %v", ln.Subs)
3336+
}
3337+
}
3338+
}
3339+
// Make sure that if we don't ask for subs, we don't get them
3340+
pollURL = fmt.Sprintf("http://127.0.0.1:%d/leafz", sa.MonitorAddr().Port)
3341+
for pollMode := 1; pollMode < 2; pollMode++ {
3342+
l := pollLeafz(t, sa, pollMode, pollURL, nil)
3343+
for _, ln := range l.Leafs {
3344+
if ln.NumSubs != 2 {
3345+
t.Fatalf("Number of subs should be 2, got %v", ln.NumSubs)
3346+
}
3347+
if len(ln.Subs) != 0 {
3348+
t.Fatalf("Subs should not have been returned, got %v", ln.Subs)
3349+
}
3350+
}
3351+
}
3352+
3353+
// Now polling server B.
3354+
pollURL = fmt.Sprintf("http://127.0.0.1:%d/leafz?subs=1", sb.MonitorAddr().Port)
3355+
for pollMode := 1; pollMode < 2; pollMode++ {
3356+
l := pollLeafz(t, sb, pollMode, pollURL, &LeafzOptions{Subscriptions: true})
3357+
if l.ID != sb.ID() {
3358+
t.Fatalf("Expected ID to be %q, got %q", sb.ID(), l.ID)
3359+
}
3360+
if l.Now.IsZero() {
3361+
t.Fatalf("Expected Now to be set, was not")
3362+
}
3363+
if l.NumLeafs != 2 {
3364+
t.Fatalf("Expected NumLeafs to be 1, got %v", l.NumLeafs)
3365+
}
3366+
if len(l.Leafs) != 2 {
3367+
t.Fatalf("Expected array to be len 2, got %v", len(l.Leafs))
3368+
}
3369+
for _, ln := range l.Leafs {
3370+
if ln.Account == acc1.Name {
3371+
if ln.OutMsgs != 0 || ln.OutBytes != 0 || ln.InMsgs != 1 || ln.InBytes == 0 {
3372+
t.Fatalf("Expected 1 InMsgs/Bytes and 0 OutMsgs/Bytes, got %+v", ln)
3373+
}
3374+
} else if ln.Account == acc2.Name {
3375+
if ln.OutMsgs != 0 || ln.OutBytes != 0 || ln.InMsgs != 2 || ln.InBytes == 0 {
3376+
t.Fatalf("Expected 2 InMsgs/Bytes and 0 OutMsgs/Bytes, got %+v", ln)
3377+
}
3378+
} else {
3379+
t.Fatalf("Expected account to be %q or %q, got %q", acc1.Name, acc2.Name, ln.Account)
3380+
}
3381+
if ln.RTT == "" {
3382+
t.Fatalf("RTT not tracked?")
3383+
}
3384+
if ln.NumSubs != 0 || len(ln.Subs) != 0 {
3385+
t.Fatalf("Did not expect sub, got %v (%v)", ln.NumSubs, ln.Subs)
3386+
}
3387+
}
3388+
}
3389+
}

server/server.go

+3
Original file line numberDiff line numberDiff line change
@@ -1399,6 +1399,7 @@ const (
13991399
ConnzPath = "/connz"
14001400
RoutezPath = "/routez"
14011401
GatewayzPath = "/gatewayz"
1402+
LeafzPath = "/leafz"
14021403
SubszPath = "/subsz"
14031404
StackszPath = "/stacksz"
14041405
)
@@ -1466,6 +1467,8 @@ func (s *Server) startMonitoring(secure bool) error {
14661467
mux.HandleFunc(RoutezPath, s.HandleRoutez)
14671468
// Gatewayz
14681469
mux.HandleFunc(GatewayzPath, s.HandleGatewayz)
1470+
// Leafz
1471+
mux.HandleFunc(LeafzPath, s.HandleLeafz)
14691472
// Subz
14701473
mux.HandleFunc(SubszPath, s.HandleSubsz)
14711474
// Subz alias for backwards compatibility

0 commit comments

Comments
 (0)