Skip to content
This repository was archived by the owner on May 26, 2022. It is now read-only.

Commit 66dcf0c

Browse files
add support for the resource manager
1 parent f776b7e commit 66dcf0c

File tree

6 files changed

+64
-26
lines changed

6 files changed

+64
-26
lines changed

go.mod

+5-5
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,14 @@ go 1.16
55
require (
66
github.com/ipfs/go-log/v2 v2.4.0
77
github.com/libp2p/go-conn-security-multistream v0.3.0
8-
github.com/libp2p/go-libp2p-core v0.13.1-0.20220104083644-a3dd401efe36
8+
github.com/libp2p/go-libp2p-core v0.13.1-0.20220107083545-005b84b557bb
99
github.com/libp2p/go-libp2p-peerstore v0.6.0
10-
github.com/libp2p/go-libp2p-quic-transport v0.13.0
11-
github.com/libp2p/go-libp2p-testing v0.5.0
12-
github.com/libp2p/go-libp2p-transport-upgrader v0.6.1-0.20220104084635-5fc0a74b41f0
10+
github.com/libp2p/go-libp2p-quic-transport v0.15.1-0.20220108115653-809bf14dd0e9
11+
github.com/libp2p/go-libp2p-testing v0.6.1-0.20220107070227-f126df46085e
12+
github.com/libp2p/go-libp2p-transport-upgrader v0.6.1-0.20220108150807-be66941c1ea9
1313
github.com/libp2p/go-libp2p-yamux v0.5.0
1414
github.com/libp2p/go-stream-muxer-multistream v0.3.0
15-
github.com/libp2p/go-tcp-transport v0.4.1-0.20220104085503-4ad75e6f32a5
15+
github.com/libp2p/go-tcp-transport v0.4.1-0.20220108170023-5b0f844d771b
1616
github.com/multiformats/go-multiaddr v0.5.0
1717
github.com/multiformats/go-multiaddr-fmt v0.1.0
1818
github.com/stretchr/testify v1.7.0

go.sum

+13-12
Original file line numberDiff line numberDiff line change
@@ -276,24 +276,26 @@ github.com/libp2p/go-libp2p-core v0.5.1/go.mod h1:uN7L2D4EvPCvzSH5SrhR72UWbnSGpt
276276
github.com/libp2p/go-libp2p-core v0.8.0/go.mod h1:FfewUH/YpvWbEB+ZY9AQRQ4TAD8sJBt/G1rVvhz5XT8=
277277
github.com/libp2p/go-libp2p-core v0.10.0/go.mod h1:ECdxehoYosLYHgDDFa2N4yE8Y7aQRAMf0sX9mf2sbGg=
278278
github.com/libp2p/go-libp2p-core v0.12.0/go.mod h1:ECdxehoYosLYHgDDFa2N4yE8Y7aQRAMf0sX9mf2sbGg=
279-
github.com/libp2p/go-libp2p-core v0.13.1-0.20220104083644-a3dd401efe36 h1:b/pMmgc5EV+dqSc+MjkX5xPa1nV6EKiOb0L0XT03Lic=
280-
github.com/libp2p/go-libp2p-core v0.13.1-0.20220104083644-a3dd401efe36/go.mod h1:KlkHsZ0nKerWsXLZJm3LfFQwusI5k3iN4BgtYTE4IYE=
279+
github.com/libp2p/go-libp2p-core v0.13.1-0.20220106080354-2192774acd82/go.mod h1:KlkHsZ0nKerWsXLZJm3LfFQwusI5k3iN4BgtYTE4IYE=
280+
github.com/libp2p/go-libp2p-core v0.13.1-0.20220107083545-005b84b557bb h1:mMWwfcMQgaUY7AN2WKd+L7k0XnTkmvx159KNvHPg2oo=
281+
github.com/libp2p/go-libp2p-core v0.13.1-0.20220107083545-005b84b557bb/go.mod h1:KlkHsZ0nKerWsXLZJm3LfFQwusI5k3iN4BgtYTE4IYE=
281282
github.com/libp2p/go-libp2p-mplex v0.4.1 h1:/pyhkP1nLwjG3OM+VuaNJkQT/Pqq73WzB3aDN3Fx1sc=
282283
github.com/libp2p/go-libp2p-mplex v0.4.1/go.mod h1:cmy+3GfqfM1PceHTLL7zQzAAYaryDu6iPSC+CIb094g=
283284
github.com/libp2p/go-libp2p-peerstore v0.6.0 h1:HJminhQSGISBIRb93N6WK3t6Fa8OOTnHd/VBjL4mY5A=
284285
github.com/libp2p/go-libp2p-peerstore v0.6.0/go.mod h1:DGEmKdXrcYpK9Jha3sS7MhqYdInxJy84bIPtSu65bKc=
285286
github.com/libp2p/go-libp2p-pnet v0.2.0 h1:J6htxttBipJujEjz1y0a5+eYoiPcFHhSYHH6na5f0/k=
286287
github.com/libp2p/go-libp2p-pnet v0.2.0/go.mod h1:Qqvq6JH/oMZGwqs3N1Fqhv8NVhrdYcO0BW4wssv21LA=
287-
github.com/libp2p/go-libp2p-quic-transport v0.13.0 h1:MTVojS4AnGD/rng6rF/HXEqwMHL27rHUEf3DaqSdnUw=
288-
github.com/libp2p/go-libp2p-quic-transport v0.13.0/go.mod h1:39/ZWJ1TW/jx1iFkKzzUg00W6tDJh73FC0xYudjr7Hc=
288+
github.com/libp2p/go-libp2p-quic-transport v0.15.1-0.20220108115653-809bf14dd0e9 h1:vKncWRP2UbfShrpqEdF7a1pkKlU0mdqPFY0bo3q0LuQ=
289+
github.com/libp2p/go-libp2p-quic-transport v0.15.1-0.20220108115653-809bf14dd0e9/go.mod h1:QgXb66laBD2IR9Jz0nbiK22cd+ywp+6D5TlPS29mNFs=
289290
github.com/libp2p/go-libp2p-testing v0.1.2-0.20200422005655-8775583591d8/go.mod h1:Qy8sAncLKpwXtS2dSnDOP8ktexIAHKu+J+pnZOFZLTc=
290291
github.com/libp2p/go-libp2p-testing v0.4.0/go.mod h1:Q+PFXYoiYFN5CAEG2w3gLPEzotlKsNSbKQ/lImlOWF0=
291-
github.com/libp2p/go-libp2p-testing v0.5.0 h1:bTjC29TTQ/ODq0ld3+0KLq3irdA5cAH3OMbRi0/QsvE=
292292
github.com/libp2p/go-libp2p-testing v0.5.0/go.mod h1:QBk8fqIL1XNcno/l3/hhaIEn4aLRijpYOR+zVjjlh+A=
293+
github.com/libp2p/go-libp2p-testing v0.6.1-0.20220107070227-f126df46085e h1:gy++yXkcKv6pC6sgGPJRi0CkWPdgYFAx6zOWGxNiORg=
294+
github.com/libp2p/go-libp2p-testing v0.6.1-0.20220107070227-f126df46085e/go.mod h1:sKGFutxeBvU1FajPDMzOdNeRBTujG3IvX1Q1+xb5L38=
293295
github.com/libp2p/go-libp2p-tls v0.3.0 h1:8BgvUJiOTcj0Gp6XvEicF0rL5aUtRg/UzEdeZDmDlC8=
294296
github.com/libp2p/go-libp2p-tls v0.3.0/go.mod h1:fwF5X6PWGxm6IDRwF3V8AVCCj/hOd5oFlg+wo2FxJDY=
295-
github.com/libp2p/go-libp2p-transport-upgrader v0.6.1-0.20220104084635-5fc0a74b41f0 h1:eD/QJCpcImYOUl6MdBuxMByVaEe5VMm463zJG6oUg9o=
296-
github.com/libp2p/go-libp2p-transport-upgrader v0.6.1-0.20220104084635-5fc0a74b41f0/go.mod h1:ByIyNe8asQhgcyIHetb4f+UgV+hDrA8pQ3L/TgNs+RI=
297+
github.com/libp2p/go-libp2p-transport-upgrader v0.6.1-0.20220108150807-be66941c1ea9 h1:+gpux0WFHEuQJBpCeBVUqYKBJSTGezSQ+2b7JidUMqw=
298+
github.com/libp2p/go-libp2p-transport-upgrader v0.6.1-0.20220108150807-be66941c1ea9/go.mod h1:FpQXSlueDNp9POKfwEnKqqgtI9mXy3ibpe7Z3ueBros=
297299
github.com/libp2p/go-libp2p-yamux v0.5.0 h1:ZzmUhbQE+X7NuYUT2naxN31JyebZfRmpZVhKtRP13ys=
298300
github.com/libp2p/go-libp2p-yamux v0.5.0/go.mod h1:AyR8k5EzyM2QN9Bbdg6X1SkVVuqLwTGf0L4DFq9g6po=
299301
github.com/libp2p/go-maddr-filter v0.1.0/go.mod h1:VzZhTXkMucEGGEOSKddrwGiOv0tUhgnKqNEmIAz/bPU=
@@ -317,14 +319,14 @@ github.com/libp2p/go-sockaddr v0.1.0 h1:Y4s3/jNoryVRKEBrkJ576F17CPOaMIzUeCsg7dlT
317319
github.com/libp2p/go-sockaddr v0.1.0/go.mod h1:syPvOmNs24S3dFVGJA1/mrqdeijPxLV2Le3BRLKd68k=
318320
github.com/libp2p/go-stream-muxer-multistream v0.3.0 h1:TqnSHPJEIqDEO7h1wZZ0p3DXdvDSiLHQidKKUGZtiOY=
319321
github.com/libp2p/go-stream-muxer-multistream v0.3.0/go.mod h1:yDh8abSIzmZtqtOt64gFJUXEryejzNb0lisTt+fAMJA=
320-
github.com/libp2p/go-tcp-transport v0.4.1-0.20220104085503-4ad75e6f32a5 h1:/x3GSszKipn1nlKY0C5at59fBLYyJeObd5gm32DrobM=
321-
github.com/libp2p/go-tcp-transport v0.4.1-0.20220104085503-4ad75e6f32a5/go.mod h1:YPwlF5gW5BnFikKoQBuJeQkPXAn+z2wTzDpJKamkgjY=
322+
github.com/libp2p/go-tcp-transport v0.4.1-0.20220108170023-5b0f844d771b h1:KWpzjWlTdMN8KlREoEgWbYLPPhouy7fMh5xXttgYk0M=
323+
github.com/libp2p/go-tcp-transport v0.4.1-0.20220108170023-5b0f844d771b/go.mod h1:Rhw3Zrx0b5eR9BZ1cHH7NP28PdcHbBwUcMRwJTGacrs=
322324
github.com/libp2p/go-yamux v1.4.1 h1:P1Fe9vF4th5JOxxgQvfbOHkrGqIZniTLf+ddhZp8YTI=
323325
github.com/libp2p/go-yamux v1.4.1/go.mod h1:fr7aVgmdNGJK+N1g+b6DW6VxzbRCjCOejR/hkmpooHE=
324326
github.com/lightstep/lightstep-tracer-common/golang/gogo v0.0.0-20190605223551-bc2310a04743/go.mod h1:qklhhLq1aX+mtWk9cPHPzaBjWImj5ULL6C7HFJtXQMM=
325327
github.com/lightstep/lightstep-tracer-go v0.18.1/go.mod h1:jlF1pusYV4pidLvZ+XD0UBX0ZE6WURAspgAczcDHrL4=
326-
github.com/lucas-clemente/quic-go v0.23.0 h1:5vFnKtZ6nHDFsc/F3uuiF4T3y/AXaQdxjUqiVw26GZE=
327-
github.com/lucas-clemente/quic-go v0.23.0/go.mod h1:paZuzjXCE5mj6sikVLMvqXk8lJV2AsqtJ6bDhjEfxx0=
328+
github.com/lucas-clemente/quic-go v0.24.0 h1:ToR7SIIEdrgOhgVTHvPgdVRJfgVy+N0wQAagH7L4d5g=
329+
github.com/lucas-clemente/quic-go v0.24.0/go.mod h1:paZuzjXCE5mj6sikVLMvqXk8lJV2AsqtJ6bDhjEfxx0=
328330
github.com/lunixbochs/vtclean v1.0.0/go.mod h1:pHhQNgMf3btfWnGBVipUOjRYhoOsdGqdm/+2c2E2WMI=
329331
github.com/lyft/protoc-gen-validate v0.0.13/go.mod h1:XbGvPuh87YZc5TdIa2/I4pLk0QoUACkjt2znoq26NVQ=
330332
github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ=
@@ -740,7 +742,6 @@ golang.org/x/sys v0.0.0-20210317225723-c4fcb01b228e/go.mod h1:h1NjWce9XRLGQEsW7w
740742
golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
741743
golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
742744
golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
743-
golang.org/x/sys v0.0.0-20210511113859-b0526f3d8744/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
744745
golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c h1:F1jZWGFhYfh0Ci55sIpILtKKK8p3i2/krTr0H1rg74I=
745746
golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
746747
golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw=

swarm.go

+16
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,13 @@ func WithDialTimeoutLocal(t time.Duration) Option {
7676
}
7777
}
7878

79+
func WithResourceManager(m network.ResourceManager) Option {
80+
return func(s *Swarm) error {
81+
s.rcmgr = m
82+
return nil
83+
}
84+
}
85+
7986
// Swarm is a connection muxer, allowing connections to other peers to
8087
// be opened and closed, while still using the same Chan for all
8188
// communication. The Chan sends/receives Messages, which note the
@@ -88,6 +95,8 @@ type Swarm struct {
8895
// down before continuing.
8996
refs sync.WaitGroup
9097

98+
rcmgr network.ResourceManager
99+
91100
local peer.ID
92101
peers peerstore.Peerstore
93102

@@ -156,6 +165,9 @@ func NewSwarm(local peer.ID, peers peerstore.Peerstore, opts ...Option) (*Swarm,
156165
return nil, err
157166
}
158167
}
168+
if s.rcmgr == nil {
169+
s.rcmgr = network.NullResourceManager
170+
}
159171

160172
s.dsync = newDialSync(s.dialWorkerLoop)
161173
s.limiter = newDialLimiter(s.dialAddr)
@@ -586,6 +598,10 @@ func (s *Swarm) String() string {
586598
return fmt.Sprintf("<Swarm %s>", s.LocalPeer())
587599
}
588600

601+
func (s *Swarm) ResourceManager() network.ResourceManager {
602+
return s.rcmgr
603+
}
604+
589605
// Swarm is a Network.
590606
var _ network.Network = (*Swarm)(nil)
591607
var _ transport.TransportNetwork = (*Swarm)(nil)

swarm_conn.go

+21-4
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,8 @@ type Conn struct {
4242
stat network.ConnStats
4343
}
4444

45+
var _ network.Conn = &Conn{}
46+
4547
func (c *Conn) ID() string {
4648
// format: <first 10 chars of peer id>-<global conn ordinal>
4749
return fmt.Sprintf("%s-%d", c.RemotePeer().Pretty()[0:10], c.id)
@@ -93,6 +95,7 @@ func (c *Conn) removeStream(s *Stream) {
9395
c.stat.NumStreams--
9496
delete(c.streams.m, s)
9597
c.streams.Unlock()
98+
s.scope.Done()
9699
}
97100

98101
// listens for new streams.
@@ -109,9 +112,14 @@ func (c *Conn) start() {
109112
if err != nil {
110113
return
111114
}
115+
scope, err := c.swarm.ResourceManager().OpenStream(c.RemotePeer(), network.DirInbound)
116+
if err != nil {
117+
ts.Reset()
118+
continue
119+
}
112120
c.swarm.refs.Add(1)
113121
go func() {
114-
s, err := c.addStream(ts, network.DirInbound)
122+
s, err := c.addStream(ts, network.DirInbound, scope)
115123

116124
// Don't defer this. We don't want to block
117125
// swarm shutdown on the connection handler.
@@ -186,19 +194,23 @@ func (c *Conn) NewStream(ctx context.Context) (network.Stream, error) {
186194
}
187195
}
188196

197+
scope, err := c.swarm.ResourceManager().OpenStream(c.RemotePeer(), network.DirOutbound)
198+
if err != nil {
199+
return nil, err
200+
}
189201
ts, err := c.conn.OpenStream(ctx)
190-
191202
if err != nil {
192203
return nil, err
193204
}
194-
return c.addStream(ts, network.DirOutbound)
205+
return c.addStream(ts, network.DirOutbound, scope)
195206
}
196207

197-
func (c *Conn) addStream(ts mux.MuxedStream, dir network.Direction) (*Stream, error) {
208+
func (c *Conn) addStream(ts mux.MuxedStream, dir network.Direction, scope network.StreamManagementScope) (*Stream, error) {
198209
c.streams.Lock()
199210
// Are we still online?
200211
if c.streams.m == nil {
201212
c.streams.Unlock()
213+
scope.Done()
202214
ts.Reset()
203215
return nil, ErrConnClosed
204216
}
@@ -207,6 +219,7 @@ func (c *Conn) addStream(ts mux.MuxedStream, dir network.Direction) (*Stream, er
207219
s := &Stream{
208220
stream: ts,
209221
conn: c,
222+
scope: scope,
210223
stat: network.Stats{
211224
Direction: dir,
212225
Opened: time.Now(),
@@ -244,3 +257,7 @@ func (c *Conn) GetStreams() []network.Stream {
244257
}
245258
return streams
246259
}
260+
261+
func (c *Conn) Scope() network.ConnScope {
262+
return c.conn.Scope()
263+
}

swarm_stream.go

+5
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ type Stream struct {
2121

2222
stream mux.MuxedStream
2323
conn *Conn
24+
scope network.StreamManagementScope
2425

2526
closeOnce sync.Once
2627

@@ -154,3 +155,7 @@ func (s *Stream) SetWriteDeadline(t time.Time) error {
154155
func (s *Stream) Stat() network.Stats {
155156
return s.stat
156157
}
158+
159+
func (s *Stream) Scope() network.StreamScope {
160+
return s.scope
161+
}

testing/testing.go

+4-5
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
"github.com/libp2p/go-libp2p-core/peerstore"
1414
"github.com/libp2p/go-libp2p-core/sec/insecure"
1515
"github.com/libp2p/go-libp2p-core/transport"
16+
"github.com/libp2p/go-tcp-transport"
1617

1718
csms "github.com/libp2p/go-conn-security-multistream"
1819
"github.com/libp2p/go-libp2p-peerstore/pstoremem"
@@ -22,8 +23,6 @@ import (
2223
tptu "github.com/libp2p/go-libp2p-transport-upgrader"
2324
yamux "github.com/libp2p/go-libp2p-yamux"
2425
msmux "github.com/libp2p/go-stream-muxer-multistream"
25-
"github.com/libp2p/go-tcp-transport"
26-
2726
ma "github.com/multiformats/go-multiaddr"
2827
"github.com/stretchr/testify/require"
2928
)
@@ -90,7 +89,7 @@ func GenUpgrader(t *testing.T, n *swarm.Swarm, opts ...tptu.Option) transport.Up
9089

9190
stMuxer := msmux.NewBlankTransport()
9291
stMuxer.AddTransport("/yamux/1.0.0", yamux.DefaultTransport)
93-
u, err := tptu.New(secMuxer, stMuxer, opts...)
92+
u, err := tptu.New(secMuxer, stMuxer, nil, opts...)
9493
require.NoError(t, err)
9594
return u
9695
}
@@ -140,7 +139,7 @@ func GenSwarm(t *testing.T, opts ...Option) *swarm.Swarm {
140139
if cfg.disableReuseport {
141140
tcpOpts = append(tcpOpts, tcp.DisableReuseport())
142141
}
143-
tcpTransport, err := tcp.NewTCPTransport(upgrader, tcpOpts...)
142+
tcpTransport, err := tcp.NewTCPTransport(upgrader, nil, tcpOpts...)
144143
require.NoError(t, err)
145144
if err := s.AddTransport(tcpTransport); err != nil {
146145
t.Fatal(err)
@@ -152,7 +151,7 @@ func GenSwarm(t *testing.T, opts ...Option) *swarm.Swarm {
152151
}
153152
}
154153
if !cfg.disableQUIC {
155-
quicTransport, err := quic.NewTransport(p.PrivKey, nil, cfg.connectionGater)
154+
quicTransport, err := quic.NewTransport(p.PrivKey, nil, cfg.connectionGater, nil)
156155
if err != nil {
157156
t.Fatal(err)
158157
}

0 commit comments

Comments
 (0)