Skip to content

Commit c8eb517

Browse files
committed
fix: do not allow autodial to run in parallel
Auto dial attempts to keep the number of active connections above a configurable minimum, but dialling is slow so the dial queue can grow quite large. If unchecked this can cause OOMs. - Adds a `running` boolean to the autodialler to prevent it running in parallel. - Adds a configurable limit for the auto dial and circuit relay reservation queue lengths Fixes #1800 Fixes ipfs/helia#143
1 parent c042b5b commit c8eb517

File tree

9 files changed

+183
-76
lines changed

9 files changed

+183
-76
lines changed

package.json

+2-2
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,7 @@
135135
"@libp2p/interface-peer-id": "^2.0.1",
136136
"@libp2p/interface-peer-info": "^1.0.3",
137137
"@libp2p/interface-peer-routing": "^1.1.0",
138-
"@libp2p/interface-peer-store": "^2.0.3",
138+
"@libp2p/interface-peer-store": "^2.0.4",
139139
"@libp2p/interface-pubsub": "^4.0.0",
140140
"@libp2p/interface-record": "^2.0.6",
141141
"@libp2p/interface-registrar": "^2.0.3",
@@ -149,7 +149,7 @@
149149
"@libp2p/peer-id": "^2.0.0",
150150
"@libp2p/peer-id-factory": "^2.0.0",
151151
"@libp2p/peer-record": "^5.0.0",
152-
"@libp2p/peer-store": "^8.1.0",
152+
"@libp2p/peer-store": "^8.2.0",
153153
"@libp2p/topology": "^4.0.1",
154154
"@libp2p/tracked-map": "^3.0.0",
155155
"@libp2p/utils": "^3.0.10",

src/circuit-relay/transport/discovery.ts

+11-4
Original file line numberDiff line numberDiff line change
@@ -89,10 +89,17 @@ export class RelayDiscovery extends EventEmitter<RelayDiscoveryEvents> implement
8989
*/
9090
async discover (): Promise<void> {
9191
log('searching peer store for relays')
92-
const peers = (await this.peerStore.all())
93-
// filter by a list of peers supporting RELAY_V2_HOP and ones we are not listening on
94-
.filter(({ protocols }) => protocols.includes(RELAY_V2_HOP_CODEC))
95-
.sort(() => Math.random() - 0.5)
92+
const peers = (await this.peerStore.all({
93+
filters: [
94+
// filter by a list of peers supporting RELAY_V2_HOP and ones we are not listening on
95+
(peer) => {
96+
return peer.protocols.includes(RELAY_V2_HOP_CODEC)
97+
}
98+
],
99+
orders: [
100+
() => Math.random() < 0.5 ? 1 : -1
101+
]
102+
}))
96103

97104
for (const peer of peers) {
98105
log('found relay peer %p in content peer store', peer.id)

src/circuit-relay/transport/reservation-store.ts

+12
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,11 @@ export interface RelayStoreInit {
5050
* How many discovered relays to allow in the reservation store
5151
*/
5252
discoverRelays?: number
53+
54+
/**
55+
* Limit the number of potential relays we will dial (default: 100)
56+
*/
57+
maxReservationQueueLength?: number
5358
}
5459

5560
export type RelayType = 'discovered' | 'configured'
@@ -74,6 +79,7 @@ export class ReservationStore extends EventEmitter<ReservationStoreEvents> imple
7479
private readonly reserveQueue: PQueue
7580
private readonly reservations: PeerMap<RelayEntry>
7681
private readonly maxDiscoveredRelays: number
82+
private readonly maxReservationQueueLength: number
7783
private started: boolean
7884

7985
constructor (components: RelayStoreComponents, init?: RelayStoreInit) {
@@ -86,6 +92,7 @@ export class ReservationStore extends EventEmitter<ReservationStoreEvents> imple
8692
this.events = components.events
8793
this.reservations = new PeerMap()
8894
this.maxDiscoveredRelays = init?.discoverRelays ?? 0
95+
this.maxReservationQueueLength = init?.maxReservationQueueLength ?? 100
8996
this.started = false
9097

9198
// ensure we don't listen on multiple relays simultaneously
@@ -130,6 +137,11 @@ export class ReservationStore extends EventEmitter<ReservationStoreEvents> imple
130137
return
131138
}
132139

140+
if (this.reserveQueue.size > this.maxReservationQueueLength) {
141+
log('not adding relay as the queue is full')
142+
return
143+
}
144+
133145
log('add relay %p', peerId)
134146

135147
await this.reserveQueue.add(async () => {

src/connection-manager/auto-dial.ts

+61-28
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import { logger } from '@libp2p/logger'
22
import { PeerMap, PeerSet } from '@libp2p/peer-collections'
33
import PQueue from 'p-queue'
4-
import { AUTO_DIAL_CONCURRENCY, AUTO_DIAL_INTERVAL, AUTO_DIAL_PRIORITY, MIN_CONNECTIONS } from './constants.js'
4+
import { AUTO_DIAL_CONCURRENCY, AUTO_DIAL_INTERVAL, AUTO_DIAL_MAX_QUEUE_LENGTH, AUTO_DIAL_PRIORITY, MIN_CONNECTIONS } from './constants.js'
55
import type { ConnectionManager } from '@libp2p/interface-connection-manager'
66
import type { Libp2pEvents } from '@libp2p/interface-libp2p'
77
import type { PeerStore } from '@libp2p/interface-peer-store'
@@ -12,6 +12,7 @@ const log = logger('libp2p:connection-manager:auto-dial')
1212

1313
interface AutoDialInit {
1414
minConnections?: number
15+
maxQueueLength?: number
1516
autoDialConcurrency?: number
1617
autoDialPriority?: number
1718
autoDialInterval?: number
@@ -25,6 +26,7 @@ interface AutoDialComponents {
2526

2627
const defaultOptions = {
2728
minConnections: MIN_CONNECTIONS,
29+
maxQueueLength: AUTO_DIAL_MAX_QUEUE_LENGTH,
2830
autoDialConcurrency: AUTO_DIAL_CONCURRENCY,
2931
autoDialPriority: AUTO_DIAL_PRIORITY,
3032
autoDialInterval: AUTO_DIAL_INTERVAL
@@ -37,8 +39,10 @@ export class AutoDial implements Startable {
3739
private readonly minConnections: number
3840
private readonly autoDialPriority: number
3941
private readonly autoDialIntervalMs: number
42+
private readonly autoDialMaxQueueLength: number
4043
private autoDialInterval?: ReturnType<typeof setInterval>
4144
private started: boolean
45+
private running: boolean
4246

4347
/**
4448
* Proactively tries to connect to known peers stored in the PeerStore.
@@ -51,7 +55,9 @@ export class AutoDial implements Startable {
5155
this.minConnections = init.minConnections ?? defaultOptions.minConnections
5256
this.autoDialPriority = init.autoDialPriority ?? defaultOptions.autoDialPriority
5357
this.autoDialIntervalMs = init.autoDialInterval ?? defaultOptions.autoDialInterval
58+
this.autoDialMaxQueueLength = init.maxQueueLength ?? defaultOptions.maxQueueLength
5459
this.started = false
60+
this.running = false
5561
this.queue = new PQueue({
5662
concurrency: init.autoDialConcurrency ?? defaultOptions.autoDialConcurrency
5763
})
@@ -73,7 +79,7 @@ export class AutoDial implements Startable {
7379
}
7480

7581
start (): void {
76-
this.autoDialInterval = setInterval(() => {
82+
this.autoDialInterval = setTimeout(() => {
7783
this.autoDial()
7884
.catch(err => {
7985
log.error('error while autodialing', err)
@@ -92,8 +98,9 @@ export class AutoDial implements Startable {
9298
stop (): void {
9399
// clear the queue
94100
this.queue.clear()
95-
clearInterval(this.autoDialInterval)
101+
clearTimeout(this.autoDialInterval)
96102
this.started = false
103+
this.running = false
97104
}
98105

99106
async autoDial (): Promise<void> {
@@ -103,47 +110,62 @@ export class AutoDial implements Startable {
103110

104111
const connections = this.connectionManager.getConnectionsMap()
105112
const numConnections = connections.size
106-
const dialQueue = new PeerSet(
107-
// @ts-expect-error boolean filter removes falsy peer IDs
108-
this.connectionManager.getDialQueue()
109-
.map(queue => queue.peerId)
110-
.filter(Boolean)
111-
)
112113

113114
// Already has enough connections
114115
if (numConnections >= this.minConnections) {
115116
log.trace('have enough connections %d/%d', numConnections, this.minConnections)
116117
return
117118
}
118119

119-
log('not enough connections %d/%d - will dial peers to increase the number of connections', numConnections, this.minConnections)
120+
if (this.queue.size > this.autoDialMaxQueueLength) {
121+
log('not enough connections %d/%d but auto dial queue is full', numConnections, this.minConnections)
122+
return
123+
}
120124

121-
// Sort peers on whether we know protocols or public keys for them
122-
const peers = await this.peerStore.all()
125+
if (this.running) {
126+
log('not enough connections %d/%d - but skipping autodial as it is already running', numConnections, this.minConnections)
127+
return
128+
}
123129

124-
// Remove some peers
125-
const filteredPeers = peers.filter((peer) => {
126-
// Remove peers without addresses
127-
if (peer.addresses.length === 0) {
128-
return false
129-
}
130+
this.running = true
130131

131-
// remove peers we are already connected to
132-
if (connections.has(peer.id)) {
133-
return false
134-
}
132+
log('not enough connections %d/%d - will dial peers to increase the number of connections', numConnections, this.minConnections)
135133

136-
// remove peers we are already dialling
137-
if (dialQueue.has(peer.id)) {
138-
return false
139-
}
134+
const dialQueue = new PeerSet(
135+
// @ts-expect-error boolean filter removes falsy peer IDs
136+
this.connectionManager.getDialQueue()
137+
.map(queue => queue.peerId)
138+
.filter(Boolean)
139+
)
140140

141-
return true
141+
// Sort peers on whether we know protocols or public keys for them
142+
const peers = await this.peerStore.all({
143+
filters: [
144+
// Remove some peers
145+
(peer) => {
146+
// Remove peers without addresses
147+
if (peer.addresses.length === 0) {
148+
return false
149+
}
150+
151+
// remove peers we are already connected to
152+
if (connections.has(peer.id)) {
153+
return false
154+
}
155+
156+
// remove peers we are already dialling
157+
if (dialQueue.has(peer.id)) {
158+
return false
159+
}
160+
161+
return true
162+
}
163+
]
142164
})
143165

144166
// shuffle the peers - this is so peers with the same tag values will be
145167
// dialled in a different order each time
146-
const shuffledPeers = filteredPeers.sort(() => Math.random() > 0.5 ? 1 : -1)
168+
const shuffledPeers = peers.sort(() => Math.random() > 0.5 ? 1 : -1)
147169

148170
// Sort shuffled peers by tag value
149171
const peerValues = new PeerMap<number>()
@@ -196,5 +218,16 @@ export class AutoDial implements Startable {
196218
log.error('could not connect to peerStore stored peer', err)
197219
})
198220
}
221+
222+
this.running = false
223+
224+
if (this.started) {
225+
this.autoDialInterval = setTimeout(() => {
226+
this.autoDial()
227+
.catch(err => {
228+
log.error('error while autodialing', err)
229+
})
230+
}, this.autoDialIntervalMs)
231+
}
199232
}
200233
}

src/connection-manager/constants.ts

+5
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,11 @@ export const AUTO_DIAL_CONCURRENCY = 25
4949
*/
5050
export const AUTO_DIAL_PRIORITY = 0
5151

52+
/**
53+
* @see https://libp2p.github.io/js-libp2p/interfaces/index._internal_.ConnectionManagerConfig.html#autoDialMaxQueueLength
54+
*/
55+
export const AUTO_DIAL_MAX_QUEUE_LENGTH = 100
56+
5257
/**
5358
* @see https://libp2p.github.io/js-libp2p/interfaces/index._internal_.ConnectionManagerConfig.html#inboundConnectionThreshold
5459
*/

src/connection-manager/index.ts

+18-12
Original file line numberDiff line numberDiff line change
@@ -10,15 +10,15 @@ import { codes } from '../errors.js'
1010
import { getPeerAddress } from '../get-peer.js'
1111
import { AutoDial } from './auto-dial.js'
1212
import { ConnectionPruner } from './connection-pruner.js'
13-
import { AUTO_DIAL_CONCURRENCY, AUTO_DIAL_PRIORITY, DIAL_TIMEOUT, INBOUND_CONNECTION_THRESHOLD, MAX_CONNECTIONS, MAX_INCOMING_PENDING_CONNECTIONS, MAX_PARALLEL_DIALS, MAX_PEER_ADDRS_TO_DIAL, MIN_CONNECTIONS } from './constants.js'
13+
import { AUTO_DIAL_CONCURRENCY, AUTO_DIAL_MAX_QUEUE_LENGTH, AUTO_DIAL_PRIORITY, DIAL_TIMEOUT, INBOUND_CONNECTION_THRESHOLD, MAX_CONNECTIONS, MAX_INCOMING_PENDING_CONNECTIONS, MAX_PARALLEL_DIALS, MAX_PEER_ADDRS_TO_DIAL, MIN_CONNECTIONS } from './constants.js'
1414
import { DialQueue } from './dial-queue.js'
1515
import type { Connection, MultiaddrConnection } from '@libp2p/interface-connection'
1616
import type { ConnectionGater } from '@libp2p/interface-connection-gater'
1717
import type { ConnectionManager } from '@libp2p/interface-connection-manager'
1818
import type { PendingDial, AddressSorter, Libp2pEvents } from '@libp2p/interface-libp2p'
1919
import type { Metrics } from '@libp2p/interface-metrics'
2020
import type { PeerId } from '@libp2p/interface-peer-id'
21-
import type { PeerStore } from '@libp2p/interface-peer-store'
21+
import type { Peer, PeerStore } from '@libp2p/interface-peer-store'
2222
import type { TransportManager } from '@libp2p/interface-transport'
2323
import type { AbortOptions } from '@libp2p/interfaces'
2424
import type { EventEmitter } from '@libp2p/interfaces/events'
@@ -61,6 +61,12 @@ export interface ConnectionManagerInit {
6161
*/
6262
autoDialPriority?: number
6363

64+
/**
65+
* Limit the maximum number of peers to dial when trying to keep the number of
66+
* open connections above `minConnections`. (default: 100)
67+
*/
68+
autoDialMaxQueueLength?: number
69+
6470
/**
6571
* Sort the known addresses of a peer before trying to dial, By default public
6672
* addresses will be dialled before private (e.g. loopback or LAN) addresses.
@@ -136,7 +142,8 @@ const defaultOptions = {
136142
inboundConnectionThreshold: INBOUND_CONNECTION_THRESHOLD,
137143
maxIncomingPendingConnections: MAX_INCOMING_PENDING_CONNECTIONS,
138144
autoDialConcurrency: AUTO_DIAL_CONCURRENCY,
139-
autoDialPriority: AUTO_DIAL_PRIORITY
145+
autoDialPriority: AUTO_DIAL_PRIORITY,
146+
autoDialMaxQueueLength: AUTO_DIAL_MAX_QUEUE_LENGTH
140147
}
141148

142149
export interface DefaultConnectionManagerComponents {
@@ -217,7 +224,8 @@ export class DefaultConnectionManager implements ConnectionManager, Startable {
217224
}, {
218225
minConnections,
219226
autoDialConcurrency: init.autoDialConcurrency ?? defaultOptions.autoDialConcurrency,
220-
autoDialPriority: init.autoDialPriority ?? defaultOptions.autoDialPriority
227+
autoDialPriority: init.autoDialPriority ?? defaultOptions.autoDialPriority,
228+
maxQueueLength: init.autoDialMaxQueueLength ?? defaultOptions.autoDialMaxQueueLength
221229
})
222230

223231
// controls what happens when we have too many connections
@@ -344,17 +352,15 @@ export class DefaultConnectionManager implements ConnectionManager, Startable {
344352
// re-connect to any peers with the KEEP_ALIVE tag
345353
void Promise.resolve()
346354
.then(async () => {
347-
const keepAlivePeers: PeerId[] = []
348-
349-
for (const peer of await this.peerStore.all()) {
350-
if (peer.tags.has(KEEP_ALIVE)) {
351-
keepAlivePeers.push(peer.id)
352-
}
353-
}
355+
const keepAlivePeers: Peer[] = await this.peerStore.all({
356+
filters: [(peer) => {
357+
return peer.tags.has(KEEP_ALIVE)
358+
}]
359+
})
354360

355361
await Promise.all(
356362
keepAlivePeers.map(async peer => {
357-
await this.openConnection(peer)
363+
await this.openConnection(peer.id)
358364
.catch(err => {
359365
log.error(err)
360366
})

src/upnp-nat/index.ts

+2-2
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,7 @@ class UPnPNAT implements Startable {
142142
continue
143143
}
144144

145-
const client = await this._getClient()
145+
const client = this._getClient()
146146
const publicIp = this.externalAddress ?? await client.externalIp()
147147
const isPrivate = isPrivateIp(publicIp)
148148

@@ -173,7 +173,7 @@ class UPnPNAT implements Startable {
173173
}
174174
}
175175

176-
async _getClient (): Promise<NatAPI> {
176+
_getClient (): NatAPI {
177177
if (this.client != null) {
178178
return this.client
179179
}

0 commit comments

Comments
 (0)