Skip to content

Commit ff533c7

Browse files
committed
fix: ensure dht query is aborted on early exit
If query results are consumed from a `for await..of`-style loop, and that loop is exited from before the results are complete, ensure we abort any running sub-queries.
1 parent 3687f1e commit ff533c7

File tree

2 files changed

+72
-22
lines changed

2 files changed

+72
-22
lines changed

packages/kad-dht/src/query/manager.ts

+24-13
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
1-
import { AbortError, TypedEventEmitter, CustomEvent, setMaxListeners } from '@libp2p/interface'
1+
import { TypedEventEmitter, CustomEvent, setMaxListeners } from '@libp2p/interface'
22
import { PeerSet } from '@libp2p/peer-collections'
33
import { anySignal } from 'any-signal'
44
import merge from 'it-merge'
5+
import { raceSignal } from 'race-signal'
56
import { toString as uint8ArrayToString } from 'uint8arrays/to-string'
67
import {
78
ALPHA, K, DEFAULT_QUERY_TIMEOUT
@@ -127,7 +128,16 @@ export class QueryManager implements Startable {
127128
}
128129
}
129130

130-
const signal = anySignal([this.shutDownController.signal, options.signal])
131+
// if the user breaks out of a for..await of loop iterating over query
132+
// results we need to cancel any in-flight network requests
133+
const queryEarlyExitController = new AbortController()
134+
setMaxListeners(Infinity, queryEarlyExitController.signal)
135+
136+
const signal = anySignal([
137+
this.shutDownController.signal,
138+
queryEarlyExitController.signal,
139+
options.signal
140+
])
131141

132142
// this signal will get listened to for every invocation of queryFunc
133143
// so make sure we don't make a lot of noise in the logs
@@ -138,19 +148,13 @@ export class QueryManager implements Startable {
138148
// query a subset of peers up to `kBucketSize / 2` in length
139149
const startTime = Date.now()
140150
const cleanUp = new TypedEventEmitter<CleanUpEvents>()
151+
let queryFinished = false
141152

142153
try {
143154
if (options.isSelfQuery !== true && this.initialQuerySelfHasRun != null) {
144155
log('waiting for initial query-self query before continuing')
145156

146-
await Promise.race([
147-
new Promise((resolve, reject) => {
148-
signal.addEventListener('abort', () => {
149-
reject(new AbortError('Query was aborted before self-query ran'))
150-
})
151-
}),
152-
this.initialQuerySelfHasRun.promise
153-
])
157+
await raceSignal(this.initialQuerySelfHasRun.promise, signal)
154158

155159
this.initialQuerySelfHasRun = undefined
156160
}
@@ -192,19 +196,26 @@ export class QueryManager implements Startable {
192196

193197
// Execute the query along each disjoint path and yield their results as they become available
194198
for await (const event of merge(...paths)) {
195-
yield event
196-
197199
if (event.name === 'QUERY_ERROR') {
198-
log('error', event.error)
200+
log.error('query error', event.error)
199201
}
202+
203+
yield event
200204
}
205+
206+
queryFinished = true
201207
} catch (err: any) {
202208
if (!this.running && err.code === 'ERR_QUERY_ABORTED') {
203209
// ignore query aborted errors that were thrown during query manager shutdown
204210
} else {
205211
throw err
206212
}
207213
} finally {
214+
if (!queryFinished) {
215+
log('query exited early')
216+
queryEarlyExitController.abort()
217+
}
218+
208219
signal.clear()
209220

210221
this.queries--

packages/kad-dht/test/query.spec.ts

+48-9
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ import { QueryManager, type QueryManagerInit } from '../src/query/manager.js'
1919
import { convertBuffer } from '../src/utils.js'
2020
import { createPeerId, createPeerIds } from './utils/create-peer-id.js'
2121
import { sortClosestPeers } from './utils/sort-closest-peers.js'
22-
import type { QueryFunc } from '../src/query/types.js'
22+
import type { QueryContext, QueryFunc } from '../src/query/types.js'
2323
import type { RoutingTable } from '../src/routing-table/index.js'
2424
import type { PeerId } from '@libp2p/interface'
2525

@@ -29,12 +29,9 @@ interface TopologyEntry {
2929
value?: Uint8Array
3030
closerPeers?: number[]
3131
event: QueryEvent
32+
context?: QueryContext
3233
}
33-
type Topology = Record<string, {
34-
delay?: number | undefined
35-
error?: Error | undefined
36-
event: QueryEvent
37-
}>
34+
type Topology = Record<string, TopologyEntry>
3835

3936
describe('QueryManager', () => {
4037
let ourPeerId: PeerId
@@ -55,7 +52,7 @@ describe('QueryManager', () => {
5552
}
5653

5754
function createTopology (opts: Record<number, { delay?: number, error?: Error, value?: Uint8Array, closerPeers?: number[] }>): Topology {
58-
const topology: Record<string, { delay?: number, error?: Error, event: QueryEvent }> = {}
55+
const topology: Topology = {}
5956

6057
Object.keys(opts).forEach(key => {
6158
const id = parseInt(key)
@@ -94,9 +91,12 @@ describe('QueryManager', () => {
9491
return topology
9592
}
9693

97-
function createQueryFunction (topology: Record<string, { delay?: number, event: QueryEvent }>): QueryFunc {
98-
const queryFunc: QueryFunc = async function * ({ peer }) {
94+
function createQueryFunction (topology: Topology): QueryFunc {
95+
const queryFunc: QueryFunc = async function * (context) {
96+
const { peer } = context
97+
9998
const res = topology[peer.toString()]
99+
res.context = context
100100

101101
if (res.delay != null) {
102102
await delay(res.delay)
@@ -870,4 +870,43 @@ describe('QueryManager', () => {
870870

871871
await manager.stop()
872872
})
873+
874+
it('should abort the query if we break out of the loop early', async () => {
875+
const manager = new QueryManager({
876+
peerId: ourPeerId,
877+
logger: defaultLogger()
878+
}, {
879+
...defaultInit(),
880+
disjointPaths: 2
881+
})
882+
await manager.start()
883+
884+
// 1 -> 0 [pathComplete]
885+
// 4 -> 3 [delay] -> 2 [pathComplete]
886+
const topology = createTopology({
887+
// quick value path
888+
0: { value: uint8ArrayFromString('true') },
889+
1: { closerPeers: [0] },
890+
// slow value path
891+
2: { value: uint8ArrayFromString('true') },
892+
3: { delay: 100, closerPeers: [2] },
893+
4: { closerPeers: [3] }
894+
})
895+
896+
routingTable.closestPeers.returns([peers[1], peers[4]])
897+
898+
for await (const event of manager.run(key, createQueryFunction(topology))) {
899+
if (event.name === 'VALUE') {
900+
expect(event.from.toString()).to.equal(peers[0].toString())
901+
902+
// break out of loop early
903+
break
904+
}
905+
}
906+
907+
// should have aborted query on slow path
908+
expect(topology[peers[3].toString()]).to.have.nested.property('context.signal.aborted', true)
909+
910+
await manager.stop()
911+
})
873912
})

0 commit comments

Comments
 (0)