Skip to content

Commit 346ff5a

Browse files
achingbrainmaschad
andauthored
fix: opt-in to toplogy notifications on transient connections (#2049)
Adds a `notifyOnTransient` option when registering a network topology to opt-in to being notified when peers that support the registered protocol connect over transient connections. False by default. The logic has been switched to check each connection's transient property and only notify if the user has opted in. The side effect here is that if `notifyOnTransient` is true, and the peer ends up opening a direct connection (for example they dial us via circuit relay, open a stream to do the WebRTC SDP exchange, then open a WebRTC connection), identify will run on the second connection so the topology will receive two notifications. This is not a breaking change since the previous behaviour would have been to only notify on the initial transient connection, which you can't do data-heavy things like bitswap over, or long-lived things like GossipSub so is probably a bug. Fixes #2036 --------- Co-authored-by: Chad Nehemiah <[email protected]>
1 parent 50442d7 commit 346ff5a

File tree

10 files changed

+235
-97
lines changed

10 files changed

+235
-97
lines changed

packages/interface/src/index.ts

+5
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,11 @@ export interface IdentifyResult {
9999
* If sent by the remote peer this is the deserialized signed peer record
100100
*/
101101
signedPeerRecord?: SignedPeerRecord
102+
103+
/**
104+
* The connection that the identify protocol ran over
105+
*/
106+
connection: Connection
102107
}
103108

104109
/**

packages/interface/src/topology/index.ts

+15
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,21 @@ export interface Topology {
55
min?: number
66
max?: number
77

8+
/**
9+
* If true, invoke `onConnect` for this topology on transient (e.g. short-lived
10+
* and/or data-limited) connections. (default: false)
11+
*/
12+
notifyOnTransient?: boolean
13+
14+
/**
15+
* Invoked when a new connection is opened to a peer that supports the
16+
* registered protocol
17+
*/
818
onConnect?(peerId: PeerId, conn: Connection): void
19+
20+
/**
21+
* Invoked when the last connection to a peer that supports the registered
22+
* protocol closes
23+
*/
924
onDisconnect?(peerId: PeerId): void
1025
}

packages/libp2p/src/circuit-relay/transport/discovery.ts

+1
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ export class RelayDiscovery extends TypedEventEmitter<RelayDiscoveryEvents> impl
5757
// register a topology listener for when new peers are encountered
5858
// that support the hop protocol
5959
this.topologyId = await this.registrar.register(RELAY_V2_HOP_CODEC, {
60+
notifyOnTransient: true,
6061
onConnect: (peerId) => {
6162
this.safeDispatchEvent('relay:discover', { detail: peerId })
6263
}

packages/libp2p/src/dcutr/dcutr.ts

+1
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ export class DefaultDCUtRService implements Startable {
7272
// register for notifications of when peers that support DCUtR connect
7373
// nb. requires the identify service to be enabled
7474
this.topologyId = await this.registrar.register(multicodec, {
75+
notifyOnTransient: true,
7576
onConnect: (peerId, connection) => {
7677
if (!connection.transient) {
7778
// the connection is already direct, no upgrade is required

packages/libp2p/src/identify/identify.ts

+23-24
Original file line numberDiff line numberDiff line change
@@ -315,22 +315,7 @@ export class DefaultIdentifyService implements Startable, IdentifyService {
315315
this.addressManager.addObservedAddr(cleanObservedAddr)
316316
}
317317

318-
const signedPeerRecord = await this.#consumeIdentifyMessage(connection.remotePeer, message)
319-
320-
const result: IdentifyResult = {
321-
peerId: id,
322-
protocolVersion: message.protocolVersion,
323-
agentVersion: message.agentVersion,
324-
publicKey: message.publicKey,
325-
listenAddrs: message.listenAddrs.map(buf => multiaddr(buf)),
326-
observedAddr: message.observedAddr == null ? undefined : multiaddr(message.observedAddr),
327-
protocols: message.protocols,
328-
signedPeerRecord
329-
}
330-
331-
this.events.safeDispatchEvent('peer:identify', { detail: result })
332-
333-
return result
318+
return this.#consumeIdentifyMessage(connection, message)
334319
}
335320

336321
/**
@@ -405,7 +390,7 @@ export class DefaultIdentifyService implements Startable, IdentifyService {
405390
const message = await pb.read(options)
406391
await stream.close(options)
407392

408-
await this.#consumeIdentifyMessage(connection.remotePeer, message)
393+
await this.#consumeIdentifyMessage(connection, message)
409394
} catch (err: any) {
410395
log.error('received invalid message', err)
411396
stream.abort(err)
@@ -415,8 +400,8 @@ export class DefaultIdentifyService implements Startable, IdentifyService {
415400
log('handled push from %p', connection.remotePeer)
416401
}
417402

418-
async #consumeIdentifyMessage (remotePeer: PeerId, message: Identify): Promise<SignedPeerRecord | undefined> {
419-
log('received identify from %p', remotePeer)
403+
async #consumeIdentifyMessage (connection: Connection, message: Identify): Promise<IdentifyResult> {
404+
log('received identify from %p', connection.remotePeer)
420405

421406
if (message == null) {
422407
throw new Error('Message was null or undefined')
@@ -436,7 +421,7 @@ export class DefaultIdentifyService implements Startable, IdentifyService {
436421

437422
// if the peer record has been sent, prefer the addresses in the record as they are signed by the remote peer
438423
if (message.signedPeerRecord != null) {
439-
log('received signedPeerRecord in push from %p', remotePeer)
424+
log('received signedPeerRecord in push from %p', connection.remotePeer)
440425

441426
let peerRecordEnvelope = message.signedPeerRecord
442427
const envelope = await RecordEnvelope.openAndCertify(peerRecordEnvelope, PeerRecord.DOMAIN)
@@ -448,7 +433,7 @@ export class DefaultIdentifyService implements Startable, IdentifyService {
448433
}
449434

450435
// Make sure remote peer is the one sending the record
451-
if (!remotePeer.equals(peerRecord.peerId)) {
436+
if (!connection.remotePeer.equals(peerRecord.peerId)) {
452437
throw new Error('signing key does not match remote PeerId')
453438
}
454439

@@ -494,7 +479,7 @@ export class DefaultIdentifyService implements Startable, IdentifyService {
494479
addresses: peerRecord.multiaddrs
495480
}
496481
} else {
497-
log('%p did not send a signed peer record', remotePeer)
482+
log('%p did not send a signed peer record', connection.remotePeer)
498483
}
499484

500485
if (message.agentVersion != null) {
@@ -505,9 +490,23 @@ export class DefaultIdentifyService implements Startable, IdentifyService {
505490
peer.metadata.set('ProtocolVersion', uint8ArrayFromString(message.protocolVersion))
506491
}
507492

508-
await this.peerStore.patch(remotePeer, peer)
493+
await this.peerStore.patch(connection.remotePeer, peer)
509494

510-
return output
495+
const result: IdentifyResult = {
496+
peerId: connection.remotePeer,
497+
protocolVersion: message.protocolVersion,
498+
agentVersion: message.agentVersion,
499+
publicKey: message.publicKey,
500+
listenAddrs: message.listenAddrs.map(buf => multiaddr(buf)),
501+
observedAddr: message.observedAddr == null ? undefined : multiaddr(message.observedAddr),
502+
protocols: message.protocols,
503+
signedPeerRecord: output,
504+
connection
505+
}
506+
507+
this.events.safeDispatchEvent('peer:identify', { detail: result })
508+
509+
return result
511510
}
512511
}
513512

packages/libp2p/src/registrar.ts

+19-50
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ import { CodeError } from '@libp2p/interface/errors'
22
import { logger } from '@libp2p/logger'
33
import merge from 'merge-options'
44
import { codes } from './errors.js'
5-
import type { Libp2pEvents, PeerUpdate } from '@libp2p/interface'
5+
import type { IdentifyResult, Libp2pEvents, PeerUpdate } from '@libp2p/interface'
66
import type { TypedEventTarget } from '@libp2p/interface/events'
77
import type { PeerId } from '@libp2p/interface/peer-id'
88
import type { PeerStore } from '@libp2p/interface/peer-store'
@@ -37,11 +37,11 @@ export class DefaultRegistrar implements Registrar {
3737

3838
this._onDisconnect = this._onDisconnect.bind(this)
3939
this._onPeerUpdate = this._onPeerUpdate.bind(this)
40-
this._onConnect = this._onConnect.bind(this)
40+
this._onPeerIdentify = this._onPeerIdentify.bind(this)
4141

4242
this.components.events.addEventListener('peer:disconnect', this._onDisconnect)
43-
this.components.events.addEventListener('peer:connect', this._onConnect)
4443
this.components.events.addEventListener('peer:update', this._onPeerUpdate)
44+
this.components.events.addEventListener('peer:identify', this._onPeerIdentify)
4545
}
4646

4747
getProtocols (): string[] {
@@ -183,52 +183,12 @@ export class DefaultRegistrar implements Registrar {
183183
}
184184

185185
/**
186-
* On peer connected if we already have their protocols. Usually used for reconnects
187-
* as change:protocols event won't be emitted due to identical protocols.
188-
*/
189-
_onConnect (evt: CustomEvent<PeerId>): void {
190-
const remotePeer = evt.detail
191-
192-
void this.components.peerStore.get(remotePeer)
193-
.then(peer => {
194-
const connection = this.components.connectionManager.getConnections(peer.id)[0]
195-
196-
if (connection == null) {
197-
log('peer %p connected but the connection manager did not have a connection', peer)
198-
// peer disconnected while we were loading their details from the peer store
199-
return
200-
}
201-
202-
for (const protocol of peer.protocols) {
203-
const topologies = this.topologies.get(protocol)
204-
205-
if (topologies == null) {
206-
// no topologies are interested in this protocol
207-
continue
208-
}
209-
210-
for (const topology of topologies.values()) {
211-
topology.onConnect?.(remotePeer, connection)
212-
}
213-
}
214-
})
215-
.catch(err => {
216-
if (err.code === codes.ERR_NOT_FOUND) {
217-
// peer has not completed identify so they are not in the peer store
218-
return
219-
}
220-
221-
log.error('could not inform topologies of connecting peer %p', remotePeer, err)
222-
})
223-
}
224-
225-
/**
226-
* Check if a new peer support the multicodecs for this topology
186+
* When a peer is updated, if they have removed supported protocols notify any
187+
* topologies interested in the removed protocols.
227188
*/
228189
_onPeerUpdate (evt: CustomEvent<PeerUpdate>): void {
229190
const { peer, previous } = evt.detail
230191
const removed = (previous?.protocols ?? []).filter(protocol => !peer.protocols.includes(protocol))
231-
const added = peer.protocols.filter(protocol => !(previous?.protocols ?? []).includes(protocol))
232192

233193
for (const protocol of removed) {
234194
const topologies = this.topologies.get(protocol)
@@ -242,8 +202,18 @@ export class DefaultRegistrar implements Registrar {
242202
topology.onDisconnect?.(peer.id)
243203
}
244204
}
205+
}
245206

246-
for (const protocol of added) {
207+
/**
208+
* After identify has completed and we have received the list of supported
209+
* protocols, notify any topologies interested in those protocols.
210+
*/
211+
_onPeerIdentify (evt: CustomEvent<IdentifyResult>): void {
212+
const protocols = evt.detail.protocols
213+
const connection = evt.detail.connection
214+
const peerId = evt.detail.peerId
215+
216+
for (const protocol of protocols) {
247217
const topologies = this.topologies.get(protocol)
248218

249219
if (topologies == null) {
@@ -252,12 +222,11 @@ export class DefaultRegistrar implements Registrar {
252222
}
253223

254224
for (const topology of topologies.values()) {
255-
const connection = this.components.connectionManager.getConnections(peer.id)[0]
256-
257-
if (connection == null) {
225+
if (connection.transient && topology.notifyOnTransient !== true) {
258226
continue
259227
}
260-
topology.onConnect?.(peer.id, connection)
228+
229+
topology.onConnect?.(peerId, connection)
261230
}
262231
}
263232
}

packages/libp2p/test/identify/service.node.ts

+15-11
Original file line numberDiff line numberDiff line change
@@ -65,13 +65,15 @@ describe('identify', () => {
6565
expect(connection).to.exist()
6666

6767
// wait for identify to run on the new connection
68-
await eventPromise
68+
const identifyResult = await eventPromise
69+
70+
// should have run on the new connection
71+
expect(identifyResult).to.have.nested.property('detail.connection', connection)
6972

7073
// assert we have received certified announce addresses
71-
const peer = await libp2p.peerStore.get(remoteLibp2p.peerId)
72-
expect(peer.addresses).to.have.lengthOf(1)
73-
expect(peer.addresses[0].isCertified).to.be.true('did not receive certified address via identify')
74-
expect(peer.addresses[0].multiaddr.toString()).to.startWith('/dns4/localhost/', 'did not receive announce address via identify')
74+
expect(identifyResult).to.have.deep.nested.property('detail.signedPeerRecord.addresses', [
75+
multiaddr(`/dns4/localhost/tcp/${REMOTE_PORT}`)
76+
], 'did not receive announce address via identify')
7577
})
7678

7779
it('should run identify automatically for inbound connections', async () => {
@@ -88,14 +90,16 @@ describe('identify', () => {
8890
const connection = await remoteLibp2p.dial(multiaddr(`/ip4/127.0.0.1/tcp/${LOCAL_PORT}/p2p/${libp2p.peerId.toString()}`))
8991
expect(connection).to.exist()
9092

91-
// wait for identify to run on the new connection
92-
await eventPromise
93+
// wait for identify to run
94+
const identifyResult = await eventPromise
95+
96+
// should have run on the new connection
97+
expect(identifyResult).to.have.nested.property('detail.connection', connection)
9398

9499
// assert we have received certified announce addresses
95-
const peer = await libp2p.peerStore.get(remoteLibp2p.peerId)
96-
expect(peer.addresses).to.have.lengthOf(1)
97-
expect(peer.addresses[0].isCertified).to.be.true('did not receive certified address via identify')
98-
expect(peer.addresses[0].multiaddr.toString()).to.startWith('/dns4/localhost/', 'did not receive announce address via identify')
100+
expect(identifyResult).to.have.deep.nested.property('detail.signedPeerRecord.addresses', [
101+
multiaddr(`/dns4/localhost/tcp/${LOCAL_PORT}`)
102+
], 'did not receive announce address via identify')
99103
})
100104

101105
it('should identify connection on dial and get proper announce addresses', async () => {

0 commit comments

Comments
 (0)