Skip to content

Commit 0af1865

Browse files
committed
feat: async peerstore
Refactors interfaces and classes used by `libp2p-interfaces` to use the async peer store from libp2p/js-libp2p#1058 Fixes a memory leak where peer data (multiaddrs, protocols, etc) is never evicted from memory. BREAKING CHANGE: peerstore methods and pubsub start/stop are now all async
1 parent 1a442fe commit 0af1865

File tree

4 files changed

+30
-30
lines changed

4 files changed

+30
-30
lines changed

package.json

+4-4
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@
4545
"denque": "^1.5.0",
4646
"err-code": "^3.0.1",
4747
"it-pipe": "^1.1.0",
48-
"libp2p-interfaces": "^2.0.1",
48+
"libp2p-interfaces": "^4.0.2",
4949
"peer-id": "^0.16.0",
5050
"protobufjs": "^6.11.2",
5151
"uint8arrays": "^3.0.0"
@@ -70,9 +70,9 @@
7070
"eslint-plugin-promise": "^4.2.1",
7171
"eslint-plugin-standard": "^4.0.1",
7272
"it-pair": "^1.0.0",
73-
"libp2p": "^0.35.0",
74-
"libp2p-floodsub": "^0.28.0",
75-
"libp2p-interfaces-compliance-tests": "^2.0.3",
73+
"libp2p": "libp2p/js-libp2p#feat/async-peerstore",
74+
"libp2p-floodsub": "^0.29.0",
75+
"libp2p-interfaces-compliance-tests": "^4.0.2",
7676
"libp2p-mplex": "^0.10.3",
7777
"libp2p-websockets": "^0.16.1",
7878
"lodash": "^4.17.15",

test/utils/create-gossipsub.js

+2-2
Original file line numberDiff line numberDiff line change
@@ -10,15 +10,15 @@ const {
1010
*/
1111
async function startNode(gs) {
1212
await gs._libp2p.start()
13-
gs.start()
13+
await gs.start()
1414
}
1515

1616
/**
1717
* Stop node - gossipsub + libp2p
1818
*/
1919
async function stopNode(gs) {
2020
await gs._libp2p.stop()
21-
gs.stop()
21+
await gs.stop()
2222
}
2323

2424
async function connectGossipsub (gs1, gs2) {

test/utils/index.js

+1-1
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ const createFloodsubNode = async (libp2p, shouldStart = false, options) => {
3030

3131
if (shouldStart) {
3232
await libp2p.start()
33-
fs.start()
33+
await fs.start()
3434
}
3535

3636
return fs

ts/index.ts

+23-23
Original file line numberDiff line numberDiff line change
@@ -407,7 +407,7 @@ class Gossipsub extends Pubsub {
407407
async _processRpc (id: string, peerStreams: PeerStreams, rpc: RPC): Promise<boolean> {
408408
if (await super._processRpc(id, peerStreams, rpc)) {
409409
if (rpc.control) {
410-
this._processRpcControlMessage(id, rpc.control)
410+
await this._processRpcControlMessage(id, rpc.control)
411411
}
412412
return true
413413
}
@@ -420,14 +420,14 @@ class Gossipsub extends Pubsub {
420420
* @param {RPC.IControlMessage} controlMsg
421421
* @returns {void}
422422
*/
423-
_processRpcControlMessage (id: string, controlMsg: RPC.IControlMessage): void {
423+
async _processRpcControlMessage (id: string, controlMsg: RPC.IControlMessage): Promise<void> {
424424
if (!controlMsg) {
425425
return
426426
}
427427

428428
const iwant = controlMsg.ihave ? this._handleIHave(id, controlMsg.ihave) : []
429429
const ihave = controlMsg.iwant ? this._handleIWant(id, controlMsg.iwant) : []
430-
const prune = controlMsg.graft ? this._handleGraft(id, controlMsg.graft) : []
430+
const prune = controlMsg.graft ? await this._handleGraft(id, controlMsg.graft) : []
431431
controlMsg.prune && this._handlePrune(id, controlMsg.prune)
432432

433433
if (!iwant.length && !ihave.length && !prune.length) {
@@ -652,9 +652,9 @@ class Gossipsub extends Pubsub {
652652
* Handles Graft messages
653653
* @param {string} id peer id
654654
* @param {Array<RPC.IControlGraft>} graft
655-
* @return {Array<RPC.IControlPrune>}
655+
* @return {Promise<RPC.IControlPrune[]>}
656656
*/
657-
_handleGraft (id: string, graft: RPC.IControlGraft[]): RPC.IControlPrune[] {
657+
async _handleGraft (id: string, graft: RPC.IControlGraft[]): Promise<RPC.IControlPrune[]> {
658658
const prune: string[] = []
659659
const score = this.score.score(id)
660660
const now = this._now()
@@ -741,7 +741,7 @@ class Gossipsub extends Pubsub {
741741
return []
742742
}
743743

744-
return prune.map(topic => this._makePrune(id, topic, doPX))
744+
return Promise.all(prune.map(topic => this._makePrune(id, topic, doPX)))
745745
}
746746

747747
/**
@@ -939,10 +939,10 @@ class Gossipsub extends Pubsub {
939939
* Mounts the gossipsub protocol onto the libp2p node and sends our
940940
* our subscriptions to every peer connected
941941
* @override
942-
* @returns {void}
942+
* @returns {Promise<void>}
943943
*/
944-
start (): void {
945-
super.start()
944+
async start (): Promise<void> {
945+
await super.start()
946946
this.heartbeat.start()
947947
this.score.start()
948948
// connect to direct peers
@@ -956,10 +956,10 @@ class Gossipsub extends Pubsub {
956956
/**
957957
* Unmounts the gossipsub protocol and shuts down every connection
958958
* @override
959-
* @returns {void}
959+
* @returns {Promise<void>}
960960
*/
961-
stop (): void {
962-
super.stop()
961+
async stop (): Promise<void> {
962+
await super.stop()
963963
this.heartbeat.stop()
964964
this.score.stop()
965965

@@ -1188,11 +1188,11 @@ class Gossipsub extends Pubsub {
11881188
* Sends a PRUNE message to a peer
11891189
* @param {string} id peer id
11901190
* @param {string} topic
1191-
* @returns {void}
1191+
* @returns {Promise<void>}
11921192
*/
1193-
_sendPrune (id: string, topic: string): void {
1193+
async _sendPrune (id: string, topic: string): Promise<void> {
11941194
const prune = [
1195-
this._makePrune(id, topic, this._options.doPX)
1195+
await this._makePrune(id, topic, this._options.doPX)
11961196
]
11971197

11981198
const out = createGossipRpc([], { prune })
@@ -1255,23 +1255,23 @@ class Gossipsub extends Pubsub {
12551255
* @param {Map<string, Array<string>>} tograft peer id => topic[]
12561256
* @param {Map<string, Array<string>>} toprune peer id => topic[]
12571257
*/
1258-
_sendGraftPrune (tograft: Map<string, string[]>, toprune: Map<string, string[]>, noPX: Map<string, boolean>): void {
1258+
async _sendGraftPrune (tograft: Map<string, string[]>, toprune: Map<string, string[]>, noPX: Map<string, boolean>): Promise<void> {
12591259
const doPX = this._options.doPX
12601260
for (const [id, topics] of tograft) {
12611261
const graft = topics.map((topicID) => ({ topicID }))
12621262
let prune: RPC.IControlPrune[] = []
12631263
// If a peer also has prunes, process them now
12641264
const pruning = toprune.get(id)
12651265
if (pruning) {
1266-
prune = pruning.map((topicID) => this._makePrune(id, topicID, doPX && !noPX.get(id)))
1266+
prune = await Promise.all(pruning.map((topicID) => this._makePrune(id, topicID, doPX && !noPX.get(id))))
12671267
toprune.delete(id)
12681268
}
12691269

12701270
const outRpc = createGossipRpc([], { graft, prune })
12711271
this._sendRpc(id, outRpc)
12721272
}
12731273
for (const [id, topics] of toprune) {
1274-
const prune = topics.map((topicID) => this._makePrune(id, topicID, doPX && !noPX.get(id)))
1274+
const prune = await Promise.all(topics.map((topicID) => this._makePrune(id, topicID, doPX && !noPX.get(id))))
12751275
const outRpc = createGossipRpc([], { prune })
12761276
this._sendRpc(id, outRpc)
12771277
}
@@ -1392,9 +1392,9 @@ class Gossipsub extends Pubsub {
13921392
* @param {string} id
13931393
* @param {string} topic
13941394
* @param {boolean} doPX
1395-
* @returns {RPC.IControlPrune}
1395+
* @returns {Promise<RPC.IControlPrune>}
13961396
*/
1397-
_makePrune (id: string, topic: string, doPX: boolean): RPC.IControlPrune {
1397+
async _makePrune (id: string, topic: string, doPX: boolean): Promise<RPC.IControlPrune> {
13981398
if (this.peers.get(id)!.protocol === constants.GossipsubIDv10) {
13991399
// Gossipsub v1.0 -- no backoff, the peer won't be able to parse it anyway
14001400
return {
@@ -1411,17 +1411,17 @@ class Gossipsub extends Pubsub {
14111411
const peers = getGossipPeers(this, topic, constants.GossipsubPrunePeers, (xid: string): boolean => {
14121412
return xid !== id && this.score.score(xid) >= 0
14131413
})
1414-
peers.forEach(p => {
1414+
for (const p of peers) {
14151415
// see if we have a signed record to send back; if we don't, just send
14161416
// the peer ID and let the pruned peer find them in the DHT -- we can't trust
14171417
// unsigned address records through PX anyways
14181418
// Finding signed records in the DHT is not supported at the time of writing in js-libp2p
14191419
const peerId = PeerId.createFromB58String(p)
14201420
px.push({
14211421
peerID: peerId.toBytes(),
1422-
signedPeerRecord: this._libp2p.peerStore.addressBook.getRawEnvelope(peerId)
1422+
signedPeerRecord: await this._libp2p.peerStore.addressBook.getRawEnvelope(peerId)
14231423
})
1424-
})
1424+
}
14251425
}
14261426
return {
14271427
topicID: topic,

0 commit comments

Comments
 (0)