Skip to content

Commit 54aa046

Browse files
committed
chore: abstract http message parser
1 parent cff7820 commit 54aa046

File tree

1 file changed

+93
-84
lines changed

1 file changed

+93
-84
lines changed

src/interceptors/Socket/SocketInterceptor.ts

+93-84
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@ export interface SocketEventMap {
2828
]
2929
}
3030

31+
const kRequestBodyStream = Symbol('kRequestBodyStream')
32+
3133
export class SocketInterceptor extends Interceptor<SocketEventMap> {
3234
static symbol = Symbol('socket')
3335

@@ -85,8 +87,6 @@ export class SocketInterceptor extends Interceptor<SocketEventMap> {
8587
// Otherwise, listen to the original response
8688
// and forward it to the interceptor.
8789
controller.onResponse = (response, isMockedResponse) => {
88-
console.log('onResponse callback')
89-
9090
self.emitter.emit('response', {
9191
requestId,
9292
request,
@@ -130,9 +130,7 @@ class SocketController {
130130
private shouldSuppressEvents = false
131131
private suppressedEvents: Array<[event: string, ...args: Array<unknown>]> = []
132132
private request: Request
133-
private requestParser: typeof HTTPParser
134133
private requestStream?: Readable
135-
private responseParser: typeof HTTPParser
136134
private responseStream?: Readable
137135

138136
constructor(
@@ -142,58 +140,35 @@ class SocketController {
142140
) {
143141
this.url = parseSocketConnectionUrl(normalizedOptions)
144142

145-
// Create the parser later on because a single
146-
// socket can be *reused* for multiple requests.
147-
// The same way, don't free the parser.
148-
this.requestParser = new HTTPParser()
149-
this.requestParser[HTTPParser.kOnHeadersComplete] = (
150-
verionMajor: number,
151-
versionMinor: number,
152-
headers: Array<string>,
153-
idk: number,
154-
path: string,
155-
idk2: undefined,
156-
idk3: undefined,
157-
idk4: boolean
158-
) => {
159-
this.onRequestStart(path, headers)
160-
}
161-
this.requestParser[HTTPParser.kOnBody] = (chunk: Buffer) => {
162-
this.onRequestData(chunk)
163-
}
164-
this.requestParser[HTTPParser.kOnMessageComplete] = () => {
165-
this.onRequestEnd()
166-
}
167-
this.requestParser.initialize(HTTPParser.REQUEST, {})
168-
169-
this.responseParser = new HTTPParser()
170-
this.responseParser[HTTPParser.kOnHeadersComplete] = (
171-
verionMajor: number,
172-
versionMinor: number,
173-
headers: Array<string>,
174-
method: string | undefined,
175-
url: string | undefined,
176-
status: number,
177-
statusText: string,
178-
upgrade: boolean,
179-
shouldKeepAlive: boolean
180-
) => {
181-
this.onResponseStart(status, statusText, headers)
182-
}
183-
this.responseParser[HTTPParser.kOnBody] = (chunk: Buffer) => {
184-
this.onResponseData(chunk)
185-
}
186-
this.responseParser[HTTPParser.kOnMessageComplete] = () => {
187-
this.onResponseEnd()
188-
}
189-
this.responseParser.initialize(
190-
HTTPParser.RESPONSE,
191-
// Don't create any async resources here.
192-
// This has to be "HTTPINCOMINGMESSAGE" in practice.
193-
// @see https://github.com/nodejs/llhttp/issues/44#issuecomment-582499320
194-
// new HTTPServerAsyncResource('INTERCEPTORINCOMINGMESSAGE', socket)
195-
{}
196-
)
143+
const requestParser = new HttpMessageParser('request', {
144+
onHeadersComplete: (major, minor, headers, _, path) => {
145+
this.onRequestStart(path, headers)
146+
},
147+
onBody: (chunk) => {
148+
this.onRequestData(chunk)
149+
},
150+
onMessageComplete: this.onRequestEnd.bind(this),
151+
})
152+
153+
const responseParser = new HttpMessageParser('response', {
154+
onHeadersComplete: (
155+
versionMajor,
156+
versionMinor,
157+
headers,
158+
method,
159+
url,
160+
status,
161+
statusText,
162+
upgrade,
163+
keepalive
164+
) => {
165+
this.onResponseStart(status, statusText, headers)
166+
},
167+
onBody: (chunk) => {
168+
this.onResponseData(chunk)
169+
},
170+
onMessageComplete: this.onResponseEnd.bind(this),
171+
})
197172

198173
socket.emit = new Proxy(socket.emit, {
199174
apply: (target, thisArg, args) => {
@@ -209,13 +184,13 @@ class SocketController {
209184
if (this.shouldSuppressEvents) {
210185
if (args[0] === 'error') {
211186
Reflect.set(this.socket, '_hadError', false)
212-
this.suppressedEvents.push(['error', args.slice(1)])
187+
this.suppressedEvents.push(['error', ...args.slice(1)])
213188
return true
214189
}
215190

216191
// Suppress close events for errored mocked connections.
217192
if (args[0] === 'close') {
218-
this.suppressedEvents.push(['close', args.slice(1)])
193+
this.suppressedEvents.push(['close', ...args.slice(1)])
219194
return true
220195
}
221196
}
@@ -224,7 +199,7 @@ class SocketController {
224199
},
225200
})
226201

227-
socket.once('ready', () => {
202+
socket.once('connect', () => {
228203
// Notify the interceptor once the socket is ready.
229204
// The HTTP parser triggers BEFORE that.
230205
this.onRequest(this.request)
@@ -234,7 +209,7 @@ class SocketController {
234209
socket.write = new Proxy(socket.write, {
235210
apply: (target, thisArg, args) => {
236211
if (args[0] !== null) {
237-
this.requestParser.execute(
212+
requestParser.push(
238213
Buffer.isBuffer(args[0]) ? args[0] : Buffer.from(args[0])
239214
)
240215
}
@@ -246,7 +221,7 @@ class SocketController {
246221
socket.push = new Proxy(socket.push, {
247222
apply: (target, thisArg, args) => {
248223
if (args[0] !== null) {
249-
this.responseParser.execute(
224+
responseParser.push(
250225
Buffer.isBuffer(args[0]) ? args[0] : Buffer.from(args[0])
251226
)
252227
}
@@ -304,11 +279,15 @@ class SocketController {
304279
}
305280

306281
private replayErrors() {
282+
console.log('replay errors...', this.suppressedEvents)
283+
307284
if (this.suppressedEvents.length === 0) {
308285
return
309286
}
310287

311288
for (const [event, ...args] of this.suppressedEvents) {
289+
console.log('replaying event', event, ...args)
290+
312291
if (event === 'error') {
313292
Reflect.set(this.socket, '_hadError', true)
314293
}
@@ -342,6 +321,7 @@ class SocketController {
342321
method,
343322
headers,
344323
body: methodWithBody ? Readable.toWeb(this.requestStream) : null,
324+
// @ts-expect-error Not documented fetch property.
345325
duplex: methodWithBody ? 'half' : undefined,
346326
credentials: 'same-origin',
347327
})
@@ -356,8 +336,6 @@ class SocketController {
356336
}
357337

358338
private onRequestEnd() {
359-
this.requestParser.free()
360-
361339
invariant(
362340
this.requestStream,
363341
'Failed to handle the request end: request stream is missing'
@@ -376,7 +354,7 @@ class SocketController {
376354
statusText,
377355
headers: parseRawHeaders(rawHeaders),
378356
})
379-
this.onResponse(response)
357+
this.onResponse(response, false)
380358
}
381359

382360
private onResponseData(chunk: Buffer) {
@@ -388,8 +366,6 @@ class SocketController {
388366
}
389367

390368
private onResponseEnd() {
391-
this.responseParser.free()
392-
393369
invariant(
394370
this.responseStream,
395371
'Failed to handle the response end: response stream is missing'
@@ -398,6 +374,57 @@ class SocketController {
398374
}
399375
}
400376

377+
type HttpMessageParserMessageType = 'request' | 'response'
378+
interface HttpMessageParserCallbacks<T extends HttpMessageParserMessageType> {
379+
onHeadersComplete?: T extends 'request'
380+
? (
381+
versionMajor: number,
382+
versionMinor: number,
383+
headers: Array<string>,
384+
idk: number,
385+
path: string
386+
) => void
387+
: (
388+
versionMajor: number,
389+
versionMinor: number,
390+
headers: Array<string>,
391+
method: string | undefined,
392+
url: string | undefined,
393+
status: number,
394+
statusText: string,
395+
upgrade: boolean,
396+
shouldKeepAlive: boolean
397+
) => void
398+
onBody?: (chunk: Buffer) => void
399+
onMessageComplete?: () => void
400+
}
401+
402+
class HttpMessageParser<T extends HttpMessageParserMessageType> {
403+
private parser: HTTPParser
404+
405+
constructor(messageType: T, callbacks: HttpMessageParserCallbacks<T>) {
406+
this.parser = new HTTPParser()
407+
this.parser.initialize(
408+
messageType === 'request' ? HTTPParser.REQUEST : HTTPParser.RESPONSE,
409+
// Don't create any async resources here.
410+
// This has to be "HTTPINCOMINGMESSAGE" in practice.
411+
// @see https://github.com/nodejs/llhttp/issues/44#issuecomment-582499320
412+
// new HTTPServerAsyncResource('INTERCEPTORINCOMINGMESSAGE', socket)
413+
{}
414+
)
415+
this.parser[HTTPParser.kOnHeadersComplete] = callbacks.onHeadersComplete
416+
this.parser[HTTPParser.kOnMessageComplete] = callbacks.onMessageComplete
417+
}
418+
419+
public push(chunk: Buffer): void {
420+
this.parser.execute(chunk)
421+
}
422+
423+
public destroy(): void {
424+
this.parser.free()
425+
}
426+
}
427+
401428
function parseSocketConnectionUrl(
402429
options: NormalizedSocketConnectOptions
403430
): URL {
@@ -429,21 +456,3 @@ function parseRawHeaders(rawHeaders: Array<string>): Headers {
429456
}
430457
return headers
431458
}
432-
433-
// MOCKED REQUEST:
434-
// 1. lookup // mock that's OK
435-
// 2. connect
436-
// 3. ready
437-
// HAS MOCK?
438-
// -> Y: data -> close
439-
// -> N (no response, non-existing host):
440-
// -> replayErrors()
441-
// -> lookup (error), error, close
442-
443-
// BYPASSED REQUEST TO EXISTING HOST:
444-
// 1. lookup (no errors)
445-
// 2. (skip mockConnect), forward all socket events.
446-
// 3. emit "request" on the interceptor.
447-
// 4. HAS MOCK?
448-
// -> Y: respondWith: data -> close
449-
// -> N: do nothing

0 commit comments

Comments
 (0)