Skip to content
This repository was archived by the owner on May 26, 2022. It is now read-only.

Commit 67846c7

Browse files
Merge pull request #298 from libp2p/stream-count-connstats
count the number of streams on a connection for the stats
2 parents a337a78 + 10e2caa commit 67846c7

File tree

7 files changed

+68
-25
lines changed

7 files changed

+68
-25
lines changed

go.mod

+3-3
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,11 @@ require (
66
github.com/ipfs/go-log/v2 v2.4.0
77
github.com/libp2p/go-addr-util v0.1.0
88
github.com/libp2p/go-conn-security-multistream v0.3.0
9-
github.com/libp2p/go-libp2p-core v0.11.0
10-
github.com/libp2p/go-libp2p-peerstore v0.4.0
9+
github.com/libp2p/go-libp2p-core v0.13.0
10+
github.com/libp2p/go-libp2p-peerstore v0.6.0
1111
github.com/libp2p/go-libp2p-quic-transport v0.13.0
1212
github.com/libp2p/go-libp2p-testing v0.5.0
13-
github.com/libp2p/go-libp2p-transport-upgrader v0.5.0
13+
github.com/libp2p/go-libp2p-transport-upgrader v0.6.0
1414
github.com/libp2p/go-libp2p-yamux v0.5.0
1515
github.com/libp2p/go-maddr-filter v0.1.0
1616
github.com/libp2p/go-stream-muxer-multistream v0.3.0

go.sum

+8-9
Original file line numberDiff line numberDiff line change
@@ -217,12 +217,10 @@ github.com/ipfs/go-ds-badger v0.3.0/go.mod h1:1ke6mXNqeV8K3y5Ak2bAA0osoTfmxUdupV
217217
github.com/ipfs/go-ds-leveldb v0.5.0/go.mod h1:d3XG9RUDzQ6V4SHi8+Xgj9j1XuEk1z82lquxrVbml/Q=
218218
github.com/ipfs/go-ipfs-delay v0.0.0-20181109222059-70721b86a9a8/go.mod h1:8SP1YXK1M1kXuc4KJZINY3TQQ03J2rwBG9QfXmbRPrw=
219219
github.com/ipfs/go-log v0.0.1/go.mod h1:kL1d2/hzSpI0thNYjiKfjanbVNU+IIGA/WnNESY9leM=
220+
github.com/ipfs/go-log v1.0.4 h1:6nLQdX4W8P9yZZFH7mO+X/PzjN8Laozm/lMJ6esdgzY=
220221
github.com/ipfs/go-log v1.0.4/go.mod h1:oDCg2FkjogeFOhqqb+N39l2RpTNPL6F/StPkB3kPgcs=
221-
github.com/ipfs/go-log v1.0.5 h1:2dOuUCB1Z7uoczMWgAyDck5JLb72zHzrMnGnCNNbvY8=
222-
github.com/ipfs/go-log v1.0.5/go.mod h1:j0b8ZoR+7+R99LD9jZ6+AJsrzkPbSXbZfGakb5JPtIo=
223222
github.com/ipfs/go-log/v2 v2.0.5/go.mod h1:eZs4Xt4ZUJQFM3DlanGhy7TkwwawCZcSByscwkWG+dw=
224223
github.com/ipfs/go-log/v2 v2.1.1/go.mod h1:2v2nsGfZsvvAJz13SyFzf9ObaqwHiHxsPLEHntrv9KM=
225-
github.com/ipfs/go-log/v2 v2.1.3/go.mod h1:/8d0SH3Su5Ooc31QlL1WysJhvyOTDCjcCZ9Axpmri6g=
226224
github.com/ipfs/go-log/v2 v2.3.0/go.mod h1:QqGoj30OTpnKaG/LKTGTxoP2mmQtjVMEnK72gynbe/g=
227225
github.com/ipfs/go-log/v2 v2.4.0 h1:iR/2o9PGWanVJrBgIH5Ff8mPGOwpqLaPIAFqSnsdlzk=
228226
github.com/ipfs/go-log/v2 v2.4.0/go.mod h1:nPZnh7Cj7lwS3LpRU5Mwr2ol1c2gXIEXuF6aywqrtmo=
@@ -279,14 +277,14 @@ github.com/libp2p/go-libp2p-core v0.3.0/go.mod h1:ACp3DmS3/N64c2jDzcV429ukDpicbL
279277
github.com/libp2p/go-libp2p-core v0.5.0/go.mod h1:49XGI+kc38oGVwqSBhDEwytaAxgZasHhFfQKibzTls0=
280278
github.com/libp2p/go-libp2p-core v0.5.1/go.mod h1:uN7L2D4EvPCvzSH5SrhR72UWbnSGpt5/a35Sm4upn4Y=
281279
github.com/libp2p/go-libp2p-core v0.8.0/go.mod h1:FfewUH/YpvWbEB+ZY9AQRQ4TAD8sJBt/G1rVvhz5XT8=
282-
github.com/libp2p/go-libp2p-core v0.8.6/go.mod h1:dgHr0l0hIKfWpGpqAMbpo19pen9wJfdCGv51mTmdpmM=
283280
github.com/libp2p/go-libp2p-core v0.10.0/go.mod h1:ECdxehoYosLYHgDDFa2N4yE8Y7aQRAMf0sX9mf2sbGg=
284-
github.com/libp2p/go-libp2p-core v0.11.0 h1:75jAgdA+IChNa+/mZXogfmrGkgwxkVvxmIC7pV+F6sI=
285-
github.com/libp2p/go-libp2p-core v0.11.0/go.mod h1:ECdxehoYosLYHgDDFa2N4yE8Y7aQRAMf0sX9mf2sbGg=
281+
github.com/libp2p/go-libp2p-core v0.12.0/go.mod h1:ECdxehoYosLYHgDDFa2N4yE8Y7aQRAMf0sX9mf2sbGg=
282+
github.com/libp2p/go-libp2p-core v0.13.0 h1:IFG/s8dN6JN2OTrXX9eq2wNU/Zlz2KLdwZUp5FplgXI=
283+
github.com/libp2p/go-libp2p-core v0.13.0/go.mod h1:ECdxehoYosLYHgDDFa2N4yE8Y7aQRAMf0sX9mf2sbGg=
286284
github.com/libp2p/go-libp2p-mplex v0.4.1 h1:/pyhkP1nLwjG3OM+VuaNJkQT/Pqq73WzB3aDN3Fx1sc=
287285
github.com/libp2p/go-libp2p-mplex v0.4.1/go.mod h1:cmy+3GfqfM1PceHTLL7zQzAAYaryDu6iPSC+CIb094g=
288-
github.com/libp2p/go-libp2p-peerstore v0.4.0 h1:DOhRJLnM9Dc9lIXi3rPDZBf789LXy1BrzwIs7Tj0cKA=
289-
github.com/libp2p/go-libp2p-peerstore v0.4.0/go.mod h1:rDJUFyzEWPpXpEwywkcTYYzDHlwza8riYMaUzaN6hX0=
286+
github.com/libp2p/go-libp2p-peerstore v0.6.0 h1:HJminhQSGISBIRb93N6WK3t6Fa8OOTnHd/VBjL4mY5A=
287+
github.com/libp2p/go-libp2p-peerstore v0.6.0/go.mod h1:DGEmKdXrcYpK9Jha3sS7MhqYdInxJy84bIPtSu65bKc=
290288
github.com/libp2p/go-libp2p-pnet v0.2.0 h1:J6htxttBipJujEjz1y0a5+eYoiPcFHhSYHH6na5f0/k=
291289
github.com/libp2p/go-libp2p-pnet v0.2.0/go.mod h1:Qqvq6JH/oMZGwqs3N1Fqhv8NVhrdYcO0BW4wssv21LA=
292290
github.com/libp2p/go-libp2p-quic-transport v0.13.0 h1:MTVojS4AnGD/rng6rF/HXEqwMHL27rHUEf3DaqSdnUw=
@@ -297,8 +295,9 @@ github.com/libp2p/go-libp2p-testing v0.5.0 h1:bTjC29TTQ/ODq0ld3+0KLq3irdA5cAH3OM
297295
github.com/libp2p/go-libp2p-testing v0.5.0/go.mod h1:QBk8fqIL1XNcno/l3/hhaIEn4aLRijpYOR+zVjjlh+A=
298296
github.com/libp2p/go-libp2p-tls v0.3.0 h1:8BgvUJiOTcj0Gp6XvEicF0rL5aUtRg/UzEdeZDmDlC8=
299297
github.com/libp2p/go-libp2p-tls v0.3.0/go.mod h1:fwF5X6PWGxm6IDRwF3V8AVCCj/hOd5oFlg+wo2FxJDY=
300-
github.com/libp2p/go-libp2p-transport-upgrader v0.5.0 h1:7SDl3O2+AYOgfE40Mis83ClpfGNkNA6m4FwhbOHs+iI=
301298
github.com/libp2p/go-libp2p-transport-upgrader v0.5.0/go.mod h1:Rc+XODlB3yce7dvFV4q/RmyJGsFcCZRkeZMu/Zdg0mo=
299+
github.com/libp2p/go-libp2p-transport-upgrader v0.6.0 h1:GfMCU+2aGGEm1zW3UcOz6wYSn8tXQalFfVfcww99i5A=
300+
github.com/libp2p/go-libp2p-transport-upgrader v0.6.0/go.mod h1:1e07y1ZSZdHo9HPbuU8IztM1Cj+DR5twgycb4pnRzRo=
302301
github.com/libp2p/go-libp2p-yamux v0.5.0 h1:ZzmUhbQE+X7NuYUT2naxN31JyebZfRmpZVhKtRP13ys=
303302
github.com/libp2p/go-libp2p-yamux v0.5.0/go.mod h1:AyR8k5EzyM2QN9Bbdg6X1SkVVuqLwTGf0L4DFq9g6po=
304303
github.com/libp2p/go-maddr-filter v0.1.0 h1:4ACqZKw8AqiuJfwFGq1CYDFugfXTOos+qQ3DETkhtCE=

swarm.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -212,7 +212,7 @@ func (s *Swarm) addConn(tc transport.CapableConn, dir network.Direction) (*Conn,
212212
)
213213

214214
// create the Stat object, initializing with the underlying connection Stat if available
215-
var stat network.Stat
215+
var stat network.ConnStats
216216
if cs, ok := tc.(network.ConnStat); ok {
217217
stat = cs.Stat()
218218
}

swarm_conn.go

+11-8
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ type Conn struct {
3939
m map[*Stream]struct{}
4040
}
4141

42-
stat network.Stat
42+
stat network.ConnStats
4343
}
4444

4545
func (c *Conn) ID() string {
@@ -90,6 +90,7 @@ func (c *Conn) doClose() {
9090

9191
func (c *Conn) removeStream(s *Stream) {
9292
c.streams.Lock()
93+
c.stat.NumStreams--
9394
delete(c.streams.m, s)
9495
c.streams.Unlock()
9596
}
@@ -171,7 +172,9 @@ func (c *Conn) RemotePublicKey() ic.PubKey {
171172
}
172173

173174
// Stat returns metadata pertaining to this connection
174-
func (c *Conn) Stat() network.Stat {
175+
func (c *Conn) Stat() network.ConnStats {
176+
c.streams.Lock()
177+
defer c.streams.Unlock()
175178
return c.stat
176179
}
177180

@@ -201,16 +204,16 @@ func (c *Conn) addStream(ts mux.MuxedStream, dir network.Direction) (*Stream, er
201204
}
202205

203206
// Wrap and register the stream.
204-
stat := network.Stat{
205-
Direction: dir,
206-
Opened: time.Now(),
207-
}
208207
s := &Stream{
209208
stream: ts,
210209
conn: c,
211-
stat: stat,
212-
id: atomic.AddUint64(&c.swarm.nextStreamID, 1),
210+
stat: network.Stats{
211+
Direction: dir,
212+
Opened: time.Now(),
213+
},
214+
id: atomic.AddUint64(&c.swarm.nextStreamID, 1),
213215
}
216+
c.stat.NumStreams++
214217
c.streams.m[s] = struct{}{}
215218

216219
// Released once the stream disconnect notifications have finished

swarm_net_test.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -130,9 +130,9 @@ func TestNetworkOpenStream(t *testing.T) {
130130
t.Fatal(err)
131131
}
132132

133-
numStreams := 0
133+
var numStreams int
134134
for _, conn := range nets[0].ConnsToPeer(nets[1].LocalPeer()) {
135-
numStreams += len(conn.GetStreams())
135+
numStreams += conn.Stat().NumStreams
136136
}
137137

138138
if numStreams != 1 {

swarm_stream.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ type Stream struct {
2828

2929
protocol atomic.Value
3030

31-
stat network.Stat
31+
stat network.Stats
3232
}
3333

3434
func (s *Stream) ID() string {
@@ -151,6 +151,6 @@ func (s *Stream) SetWriteDeadline(t time.Time) error {
151151
}
152152

153153
// Stat returns metadata information for this stream.
154-
func (s *Stream) Stat() network.Stat {
154+
func (s *Stream) Stat() network.Stats {
155155
return s.stat
156156
}

swarm_test.go

+41
Original file line numberDiff line numberDiff line change
@@ -424,3 +424,44 @@ func TestPreventDialListenAddr(t *testing.T) {
424424
t.Fatal("expected dial to fail: %w", err)
425425
}
426426
}
427+
428+
func TestStreamCount(t *testing.T) {
429+
s1 := GenSwarm(t)
430+
s2 := GenSwarm(t)
431+
connectSwarms(t, context.Background(), []*swarm.Swarm{s2, s1})
432+
433+
countStreams := func() (n int) {
434+
var num int
435+
for _, c := range s1.ConnsToPeer(s2.LocalPeer()) {
436+
n += c.Stat().NumStreams
437+
num += len(c.GetStreams())
438+
}
439+
require.Equal(t, n, num, "inconsistent stream count")
440+
return
441+
}
442+
443+
streams := make(chan network.Stream, 20)
444+
streamAccepted := make(chan struct{}, 1)
445+
s1.SetStreamHandler(func(str network.Stream) {
446+
streams <- str
447+
streamAccepted <- struct{}{}
448+
})
449+
450+
for i := 0; i < 10; i++ {
451+
str, err := s2.NewStream(context.Background(), s1.LocalPeer())
452+
require.NoError(t, err)
453+
str.Write([]byte("foobar"))
454+
<-streamAccepted
455+
}
456+
require.Eventually(t, func() bool { return len(streams) == 10 }, 5*time.Second, 10*time.Millisecond)
457+
require.Equal(t, countStreams(), 10)
458+
(<-streams).Reset()
459+
(<-streams).Close()
460+
require.Equal(t, countStreams(), 8)
461+
462+
str, err := s1.NewStream(context.Background(), s2.LocalPeer())
463+
require.NoError(t, err)
464+
require.Equal(t, countStreams(), 9)
465+
str.Close()
466+
require.Equal(t, countStreams(), 8)
467+
}

0 commit comments

Comments
 (0)