Skip to content

fix: errored stream is disturbed #1134

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 19 commits into from
Dec 10, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 17 additions & 7 deletions lib/core/util.js
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ function isDestroyed (stream) {
return !stream || !!(stream.destroyed || stream[kDestroyed])
}

function isAborted (stream) {
function isReadableAborted (stream) {
const state = stream && stream._readableState
return isDestroyed(stream) && state && !state.endEmitted
}
Expand Down Expand Up @@ -244,15 +244,24 @@ function validateHandler (handler, method, upgrade) {
// A body is disturbed if it has been read from and it cannot
// be re-used without losing state or data.
function isDisturbed (body) {
const state = body && body._readableState
return !!(body && (
(stream.isDisturbed && stream.isDisturbed(body)) ||
body[kBodyUsed] ||
body.readableDidRead || (state && state.dataEmitted) ||
isAborted(body)
stream.isDisturbed
? stream.isDisturbed(body) || body[kBodyUsed] // TODO (fix): Why is body[kBodyUsed] needed?
: body[kBodyUsed] ||
body.readableDidRead ||
(body._readableState && body._readableState.dataEmitted) ||
isReadableAborted(body)
))
}

function isErrored (body) {
return !!(body && (
stream.isErrored
? stream.isErrored(body)
: /state: 'errored'/.test(nodeUtil.inspect(body)
)))
}

function getSocketInfo (socket) {
return {
localAddress: socket.localAddress,
Expand Down Expand Up @@ -310,8 +319,9 @@ module.exports = {
kEnumerableProperty,
nop,
isDisturbed,
isErrored,
toUSVString: nodeUtil.toUSVString || ((val) => `${val}`),
isAborted,
isReadableAborted,
isBlobLike,
parseOrigin,
parseURL,
Expand Down
9 changes: 3 additions & 6 deletions lib/fetch/body.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,13 @@
const util = require('../core/util')
const { ReadableStreamFrom, toUSVString, isBlobLike } = require('./util')
const { FormData } = require('./formdata')
const { kState, kError } = require('./symbols')
const { kState } = require('./symbols')
const { Blob } = require('buffer')
const { kBodyUsed } = require('../core/symbols')
const assert = require('assert')
const nodeUtil = require('util')
const { NotSupportedError } = require('../core/errors')
const { isErrored } = require('../core/util')

let ReadableStream

Expand Down Expand Up @@ -187,7 +188,7 @@ function extractBody (object, keepalive = false) {
// Whenever one or more bytes are available and stream is not errored,
// enqueue a Uint8Array wrapping an ArrayBuffer containing the available
// bytes into stream.
if (!/state: 'errored'/.test(nodeUtil.inspect(stream))) {
if (!isErrored(stream)) {
controller.enqueue(new Uint8Array(value))
}
}
Expand Down Expand Up @@ -268,10 +269,6 @@ const methods = {
if (this[kState].body) {
const stream = this[kState].body.stream

if (stream[kError]) {
throw stream[kError]
}

if (util.isDisturbed(stream)) {
throw new TypeError('disturbed')
}
Expand Down
8 changes: 4 additions & 4 deletions lib/fetch/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ const {
determineRequestsReferrer,
coarsenedSharedCurrentTime
} = require('./util')
const { kState, kHeaders, kGuard, kRealm, kError } = require('./symbols')
const { kState, kHeaders, kGuard, kRealm } = require('./symbols')
const { AbortError } = require('../core/errors')
const assert = require('assert')
const { safelyExtractBody, cancelBody } = require('./body')
Expand All @@ -45,6 +45,7 @@ const {
const { kHeadersList } = require('../core/symbols')
const EE = require('events')
const { PassThrough, pipeline, compose } = require('stream')
const { isErrored } = require('../core/util')

let ReadableStream

Expand Down Expand Up @@ -1531,7 +1532,6 @@ function httpNetworkFetch (
await pullAlgorithm(controller)
},
async cancel (reason) {
stream[kError] = reason
await cancelAlgorithm(reason)
}
},
Expand Down Expand Up @@ -1742,8 +1742,8 @@ function httpNetworkFetch (
controller.enqueue(new Uint8Array(bytes))

// 8. If stream is errored, then terminate the ongoing fetch.
if (stream[kError]) {
this.context.terminate({ reason: stream[kError] })
if (isErrored(stream)) {
this.context.terminate()
return
}

Expand Down
3 changes: 1 addition & 2 deletions lib/fetch/symbols.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,5 @@ module.exports = {
kSignal: Symbol('signal'),
kState: Symbol('state'),
kGuard: Symbol('guard'),
kRealm: Symbol('realm'),
kError: Symbol('error')
kRealm: Symbol('realm')
}
2 changes: 1 addition & 1 deletion test/node-fetch/main.js
Original file line number Diff line number Diff line change
Expand Up @@ -882,7 +882,7 @@ describe('node-fetch', () => {
return expect(res.text())
.to.eventually.be.rejected
.and.be.an.instanceof(Error)
.and.have.property('name', 'AbortError')
.and.have.property('name', 'TypeError')
})
})
})
Expand Down