Skip to content

Commit b57bca4

Browse files
fix: ensure all listeners are properly closed on tcp shutdown (#2058)
Co-authored-by: Chad Nehemiah <[email protected]>
1 parent ae36e86 commit b57bca4

File tree

4 files changed

+306
-162
lines changed

4 files changed

+306
-162
lines changed

packages/libp2p/src/transport-manager.ts

+13-3
Original file line numberDiff line numberDiff line change
@@ -262,12 +262,22 @@ export class DefaultTransportManager implements TransportManager, Startable {
262262
* If a transport has any running listeners, they will be closed.
263263
*/
264264
async remove (key: string): Promise<void> {
265-
log('removing %s', key)
265+
const listeners = this.listeners.get(key) ?? []
266+
log.trace('removing transport %s', key)
266267

267268
// Close any running listeners
268-
for (const listener of this.listeners.get(key) ?? []) {
269-
await listener.close()
269+
const tasks = []
270+
log.trace('closing listeners for %s', key)
271+
while (listeners.length > 0) {
272+
const listener = listeners.pop()
273+
274+
if (listener == null) {
275+
continue
276+
}
277+
278+
tasks.push(listener.close())
270279
}
280+
await Promise.all(tasks)
271281

272282
this.transports.delete(key)
273283
this.listeners.delete(key)

packages/transport-tcp/src/listener.ts

+76-38
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import net from 'net'
2+
import { CodeError } from '@libp2p/interface/errors'
23
import { EventEmitter, CustomEvent } from '@libp2p/interface/events'
34
import { logger } from '@libp2p/logger'
45
import { CODE_P2P } from './constants.js'
@@ -46,17 +47,25 @@ interface Context extends TCPCreateListenerOptions {
4647
closeServerOnMaxConnections?: CloseServerOnMaxConnectionsOpts
4748
}
4849

49-
const SERVER_STATUS_UP = 1
50-
const SERVER_STATUS_DOWN = 0
51-
5250
export interface TCPListenerMetrics {
5351
status: MetricGroup
5452
errors: CounterGroup
5553
events: CounterGroup
5654
}
5755

58-
type Status = { started: false } | {
59-
started: true
56+
enum TCPListenerStatusCode {
57+
/**
58+
* When server object is initialized but we don't know the listening address yet or
59+
* the server object is stopped manually, can be resumed only by calling listen()
60+
**/
61+
INACTIVE = 0,
62+
ACTIVE = 1,
63+
/* During the connection limits */
64+
PAUSED = 2,
65+
}
66+
67+
type Status = { code: TCPListenerStatusCode.INACTIVE } | {
68+
code: Exclude<TCPListenerStatusCode, TCPListenerStatusCode.INACTIVE>
6069
listeningAddr: Multiaddr
6170
peerId: string | null
6271
netConfig: NetConfig
@@ -66,7 +75,7 @@ export class TCPListener extends EventEmitter<ListenerEvents> implements Listene
6675
private readonly server: net.Server
6776
/** Keep track of open connections to destroy in case of timeout */
6877
private readonly connections = new Set<MultiaddrConnection>()
69-
private status: Status = { started: false }
78+
private status: Status = { code: TCPListenerStatusCode.INACTIVE }
7079
private metrics?: TCPListenerMetrics
7180
private addr: string
7281

@@ -88,7 +97,7 @@ export class TCPListener extends EventEmitter<ListenerEvents> implements Listene
8897
if (context.closeServerOnMaxConnections != null) {
8998
// Sanity check options
9099
if (context.closeServerOnMaxConnections.closeAbove < context.closeServerOnMaxConnections.listenBelow) {
91-
throw Error('closeAbove must be >= listenBelow')
100+
throw new CodeError('closeAbove must be >= listenBelow', 'ERROR_CONNECTION_LIMITS')
92101
}
93102
}
94103

@@ -133,7 +142,7 @@ export class TCPListener extends EventEmitter<ListenerEvents> implements Listene
133142
}
134143

135144
this.metrics?.status.update({
136-
[this.addr]: SERVER_STATUS_UP
145+
[this.addr]: TCPListenerStatusCode.ACTIVE
137146
})
138147
}
139148

@@ -145,13 +154,22 @@ export class TCPListener extends EventEmitter<ListenerEvents> implements Listene
145154
})
146155
.on('close', () => {
147156
this.metrics?.status.update({
148-
[this.addr]: SERVER_STATUS_DOWN
157+
[this.addr]: this.status.code
149158
})
150-
this.dispatchEvent(new CustomEvent('close'))
159+
160+
// If this event is emitted, the transport manager will remove the listener from it's cache
161+
// in the meanwhile if the connections are dropped then listener will start listening again
162+
// and the transport manager will not be able to close the server
163+
if (this.status.code !== TCPListenerStatusCode.PAUSED) {
164+
this.dispatchEvent(new CustomEvent('close'))
165+
}
151166
})
152167
}
153168

154169
private onSocket (socket: net.Socket): void {
170+
if (this.status.code !== TCPListenerStatusCode.ACTIVE) {
171+
throw new CodeError('Server is is not listening yet', 'ERR_SERVER_NOT_RUNNING')
172+
}
155173
// Avoid uncaught errors caused by unstable connections
156174
socket.on('error', err => {
157175
log('socket error', err)
@@ -161,7 +179,7 @@ export class TCPListener extends EventEmitter<ListenerEvents> implements Listene
161179
let maConn: MultiaddrConnection
162180
try {
163181
maConn = toMultiaddrConnection(socket, {
164-
listeningAddr: this.status.started ? this.status.listeningAddr : undefined,
182+
listeningAddr: this.status.listeningAddr,
165183
socketInactivityTimeout: this.context.socketInactivityTimeout,
166184
socketCloseTimeout: this.context.socketCloseTimeout,
167185
metrics: this.metrics?.events,
@@ -189,9 +207,9 @@ export class TCPListener extends EventEmitter<ListenerEvents> implements Listene
189207
) {
190208
// The most likely case of error is if the port taken by this application is binded by
191209
// another process during the time the server if closed. In that case there's not much
192-
// we can do. netListen() will be called again every time a connection is dropped, which
210+
// we can do. resume() will be called again every time a connection is dropped, which
193211
// acts as an eventual retry mechanism. onListenError allows the consumer act on this.
194-
this.netListen().catch(e => {
212+
this.resume().catch(e => {
195213
log.error('error attempting to listen server once connection count under limit', e)
196214
this.context.closeServerOnMaxConnections?.onListenError?.(e as Error)
197215
})
@@ -206,7 +224,9 @@ export class TCPListener extends EventEmitter<ListenerEvents> implements Listene
206224
this.context.closeServerOnMaxConnections != null &&
207225
this.connections.size >= this.context.closeServerOnMaxConnections.closeAbove
208226
) {
209-
this.netClose()
227+
this.pause(false).catch(e => {
228+
log.error('error attempting to close server once connection count over limit', e)
229+
})
210230
}
211231

212232
this.dispatchEvent(new CustomEvent<Connection>('connection', { detail: conn }))
@@ -232,7 +252,7 @@ export class TCPListener extends EventEmitter<ListenerEvents> implements Listene
232252
}
233253

234254
getAddrs (): Multiaddr[] {
235-
if (!this.status.started) {
255+
if (this.status.code === TCPListenerStatusCode.INACTIVE) {
236256
return []
237257
}
238258

@@ -264,35 +284,44 @@ export class TCPListener extends EventEmitter<ListenerEvents> implements Listene
264284
}
265285

266286
async listen (ma: Multiaddr): Promise<void> {
267-
if (this.status.started) {
268-
throw Error('server is already listening')
287+
if (this.status.code === TCPListenerStatusCode.ACTIVE || this.status.code === TCPListenerStatusCode.PAUSED) {
288+
throw new CodeError('server is already listening', 'ERR_SERVER_ALREADY_LISTENING')
269289
}
270290

271291
const peerId = ma.getPeerId()
272292
const listeningAddr = peerId == null ? ma.decapsulateCode(CODE_P2P) : ma
273293
const { backlog } = this.context
274294

275-
this.status = {
276-
started: true,
277-
listeningAddr,
278-
peerId,
279-
netConfig: multiaddrToNetConfig(listeningAddr, { backlog })
280-
}
295+
try {
296+
this.status = {
297+
code: TCPListenerStatusCode.ACTIVE,
298+
listeningAddr,
299+
peerId,
300+
netConfig: multiaddrToNetConfig(listeningAddr, { backlog })
301+
}
281302

282-
await this.netListen()
303+
await this.resume()
304+
} catch (err) {
305+
this.status = { code: TCPListenerStatusCode.INACTIVE }
306+
throw err
307+
}
283308
}
284309

285310
async close (): Promise<void> {
286-
await Promise.all(
287-
Array.from(this.connections.values()).map(async maConn => { await attemptClose(maConn) })
288-
)
289-
290-
// netClose already checks if server.listening
291-
this.netClose()
311+
// Close connections and server the same time to avoid any race condition
312+
await Promise.all([
313+
Promise.all(Array.from(this.connections.values()).map(async maConn => attemptClose(maConn))),
314+
this.pause(true).catch(e => {
315+
log.error('error attempting to close server once connection count over limit', e)
316+
})
317+
])
292318
}
293319

294-
private async netListen (): Promise<void> {
295-
if (!this.status.started || this.server.listening) {
320+
/**
321+
* Can resume a stopped or start an inert server
322+
*/
323+
private async resume (): Promise<void> {
324+
if (this.server.listening || this.status.code === TCPListenerStatusCode.INACTIVE) {
296325
return
297326
}
298327

@@ -304,11 +333,17 @@ export class TCPListener extends EventEmitter<ListenerEvents> implements Listene
304333
this.server.listen(netConfig, resolve)
305334
})
306335

336+
this.status = { ...this.status, code: TCPListenerStatusCode.ACTIVE }
307337
log('Listening on %s', this.server.address())
308338
}
309339

310-
private netClose (): void {
311-
if (!this.status.started || !this.server.listening) {
340+
private async pause (permanent: boolean): Promise<void> {
341+
if (!this.server.listening && this.status.code === TCPListenerStatusCode.PAUSED && permanent) {
342+
this.status = { code: TCPListenerStatusCode.INACTIVE }
343+
return
344+
}
345+
346+
if (!this.server.listening || this.status.code !== TCPListenerStatusCode.ACTIVE) {
312347
return
313348
}
314349

@@ -326,9 +361,12 @@ export class TCPListener extends EventEmitter<ListenerEvents> implements Listene
326361
// Stops the server from accepting new connections and keeps existing connections.
327362
// 'close' event is emitted only emitted when all connections are ended.
328363
// The optional callback will be called once the 'close' event occurs.
329-
//
330-
// NOTE: Since we want to keep existing connections and have checked `!this.server.listening` it's not necessary
331-
// to pass a callback to close.
332-
this.server.close()
364+
365+
// We need to set this status before closing server, so other procedures are aware
366+
// during the time the server is closing
367+
this.status = permanent ? { code: TCPListenerStatusCode.INACTIVE } : { ...this.status, code: TCPListenerStatusCode.PAUSED }
368+
await new Promise<void>((resolve, reject) => {
369+
this.server.close(err => { (err != null) ? reject(err) : resolve() })
370+
})
333371
}
334372
}

0 commit comments

Comments
 (0)