Skip to content

Commit a3c0413

Browse files
authored
fix: errored stream is disturbed (#1134)
* fix: errored stream is disturbed * fixup * fixup * fixup * fixup * fixup * fixup * fixup * fixup * fixup * fixup * fixup * fixup * fixup * fixup * fixup * fixup * fixup * fixup
1 parent b905cea commit a3c0413

File tree

5 files changed

+26
-20
lines changed

5 files changed

+26
-20
lines changed

lib/core/util.js

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,7 @@ function isDestroyed (stream) {
150150
return !stream || !!(stream.destroyed || stream[kDestroyed])
151151
}
152152

153-
function isAborted (stream) {
153+
function isReadableAborted (stream) {
154154
const state = stream && stream._readableState
155155
return isDestroyed(stream) && state && !state.endEmitted
156156
}
@@ -244,15 +244,24 @@ function validateHandler (handler, method, upgrade) {
244244
// A body is disturbed if it has been read from and it cannot
245245
// be re-used without losing state or data.
246246
function isDisturbed (body) {
247-
const state = body && body._readableState
248247
return !!(body && (
249-
(stream.isDisturbed && stream.isDisturbed(body)) ||
250-
body[kBodyUsed] ||
251-
body.readableDidRead || (state && state.dataEmitted) ||
252-
isAborted(body)
248+
stream.isDisturbed
249+
? stream.isDisturbed(body) || body[kBodyUsed] // TODO (fix): Why is body[kBodyUsed] needed?
250+
: body[kBodyUsed] ||
251+
body.readableDidRead ||
252+
(body._readableState && body._readableState.dataEmitted) ||
253+
isReadableAborted(body)
253254
))
254255
}
255256

257+
function isErrored (body) {
258+
return !!(body && (
259+
stream.isErrored
260+
? stream.isErrored(body)
261+
: /state: 'errored'/.test(nodeUtil.inspect(body)
262+
)))
263+
}
264+
256265
function getSocketInfo (socket) {
257266
return {
258267
localAddress: socket.localAddress,
@@ -310,8 +319,9 @@ module.exports = {
310319
kEnumerableProperty,
311320
nop,
312321
isDisturbed,
322+
isErrored,
313323
toUSVString: nodeUtil.toUSVString || ((val) => `${val}`),
314-
isAborted,
324+
isReadableAborted,
315325
isBlobLike,
316326
parseOrigin,
317327
parseURL,

lib/fetch/body.js

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,13 @@
33
const util = require('../core/util')
44
const { ReadableStreamFrom, toUSVString, isBlobLike } = require('./util')
55
const { FormData } = require('./formdata')
6-
const { kState, kError } = require('./symbols')
6+
const { kState } = require('./symbols')
77
const { Blob } = require('buffer')
88
const { kBodyUsed } = require('../core/symbols')
99
const assert = require('assert')
1010
const nodeUtil = require('util')
1111
const { NotSupportedError } = require('../core/errors')
12+
const { isErrored } = require('../core/util')
1213

1314
let ReadableStream
1415

@@ -187,7 +188,7 @@ function extractBody (object, keepalive = false) {
187188
// Whenever one or more bytes are available and stream is not errored,
188189
// enqueue a Uint8Array wrapping an ArrayBuffer containing the available
189190
// bytes into stream.
190-
if (!/state: 'errored'/.test(nodeUtil.inspect(stream))) {
191+
if (!isErrored(stream)) {
191192
controller.enqueue(new Uint8Array(value))
192193
}
193194
}
@@ -268,10 +269,6 @@ const methods = {
268269
if (this[kState].body) {
269270
const stream = this[kState].body.stream
270271

271-
if (stream[kError]) {
272-
throw stream[kError]
273-
}
274-
275272
if (util.isDisturbed(stream)) {
276273
throw new TypeError('disturbed')
277274
}

lib/fetch/index.js

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ const {
3131
determineRequestsReferrer,
3232
coarsenedSharedCurrentTime
3333
} = require('./util')
34-
const { kState, kHeaders, kGuard, kRealm, kError } = require('./symbols')
34+
const { kState, kHeaders, kGuard, kRealm } = require('./symbols')
3535
const { AbortError } = require('../core/errors')
3636
const assert = require('assert')
3737
const { safelyExtractBody, cancelBody } = require('./body')
@@ -45,6 +45,7 @@ const {
4545
const { kHeadersList } = require('../core/symbols')
4646
const EE = require('events')
4747
const { PassThrough, pipeline, compose } = require('stream')
48+
const { isErrored } = require('../core/util')
4849

4950
let ReadableStream
5051

@@ -1531,7 +1532,6 @@ function httpNetworkFetch (
15311532
await pullAlgorithm(controller)
15321533
},
15331534
async cancel (reason) {
1534-
stream[kError] = reason
15351535
await cancelAlgorithm(reason)
15361536
}
15371537
},
@@ -1742,8 +1742,8 @@ function httpNetworkFetch (
17421742
controller.enqueue(new Uint8Array(bytes))
17431743

17441744
// 8. If stream is errored, then terminate the ongoing fetch.
1745-
if (stream[kError]) {
1746-
this.context.terminate({ reason: stream[kError] })
1745+
if (isErrored(stream)) {
1746+
this.context.terminate()
17471747
return
17481748
}
17491749

lib/fetch/symbols.js

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,5 @@ module.exports = {
66
kSignal: Symbol('signal'),
77
kState: Symbol('state'),
88
kGuard: Symbol('guard'),
9-
kRealm: Symbol('realm'),
10-
kError: Symbol('error')
9+
kRealm: Symbol('realm')
1110
}

test/node-fetch/main.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -882,7 +882,7 @@ describe('node-fetch', () => {
882882
return expect(res.text())
883883
.to.eventually.be.rejected
884884
.and.be.an.instanceof(Error)
885-
.and.have.property('name', 'AbortError')
885+
.and.have.property('name', 'TypeError')
886886
})
887887
})
888888
})

0 commit comments

Comments
 (0)