Skip to content

Commit 7938420

Browse files
Nitzan UzielyLinkgoron
authored andcommitted
stream: add AbortSignal support to finished
Add AbortSignal support to stream.finished
1 parent d345ac9 commit 7938420

File tree

3 files changed

+114
-2
lines changed

3 files changed

+114
-2
lines changed

doc/api/stream.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1578,6 +1578,9 @@ further errors except from `_destroy()` may be emitted as `'error'`.
15781578
<!-- YAML
15791579
added: v10.0.0
15801580
changes:
1581+
- version: REPLACEME
1582+
pr-url: https://github.com/nodejs/node/pull/37354
1583+
description: The `signal` option was added.
15811584
- version: v14.0.0
15821585
pr-url: https://github.com/nodejs/node/pull/32158
15831586
description: The `finished(stream, cb)` will wait for the `'close'` event
@@ -1604,6 +1607,10 @@ changes:
16041607
* `writable` {boolean} When set to `false`, the callback will be called when
16051608
the stream ends even though the stream might still be writable.
16061609
**Default:** `true`.
1610+
* `signal` {AbortSignal} allows aborting the wait for the stream finish. The
1611+
underlying stream will *not* be aborted if the signal is aborted. The
1612+
callback will get called with an `AbortError`. All registered
1613+
listeners added by this function will also be removed.
16071614
* `callback` {Function} A callback function that takes an optional error
16081615
argument.
16091616
* Returns: {Function} A cleanup function which removes all registered

lib/internal/streams/end-of-stream.js

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,18 @@
66
const {
77
FunctionPrototype,
88
FunctionPrototypeCall,
9+
ReflectApply,
910
} = primordials;
11+
const {
12+
AbortError,
13+
codes,
14+
} = require('internal/errors');
1015
const {
1116
ERR_STREAM_PREMATURE_CLOSE
12-
} = require('internal/errors').codes;
17+
} = codes;
1318
const { once } = require('internal/util');
1419
const {
20+
validateAbortSignal,
1521
validateFunction,
1622
validateObject,
1723
} = require('internal/validators');
@@ -76,6 +82,7 @@ function eos(stream, options, callback) {
7682
validateObject(options, 'options');
7783
}
7884
validateFunction(callback, 'callback');
85+
validateAbortSignal(options.signal, 'options.signal');
7986

8087
callback = once(callback);
8188

@@ -199,7 +206,7 @@ function eos(stream, options, callback) {
199206
});
200207
}
201208

202-
return function() {
209+
const cleanup = () => {
203210
callback = nop;
204211
stream.removeListener('aborted', onclose);
205212
stream.removeListener('complete', onfinish);
@@ -213,6 +220,27 @@ function eos(stream, options, callback) {
213220
stream.removeListener('error', onerror);
214221
stream.removeListener('close', onclose);
215222
};
223+
224+
if (options.signal && !closed) {
225+
const abort = () => {
226+
// Keep it because cleanup removes it.
227+
const endCallback = callback;
228+
cleanup();
229+
FunctionPrototypeCall(endCallback, stream, new AbortError());
230+
};
231+
if (options.signal.aborted) {
232+
process.nextTick(abort);
233+
} else {
234+
const originalCallback = callback;
235+
callback = once((...args) => {
236+
options.signal.removeEventListener('abort', abort);
237+
ReflectApply(originalCallback, stream, args);
238+
});
239+
options.signal.addEventListener('abort', abort);
240+
}
241+
}
242+
243+
return cleanup;
216244
}
217245

218246
module.exports = eos;

test/parallel/test-stream-finished.js

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,83 @@ const http = require('http');
9292
run();
9393
}
9494

95+
{
96+
// Check pre-cancelled
97+
const signal = new EventTarget();
98+
signal.aborted = true;
99+
100+
const rs = Readable.from((function* () {})());
101+
finished(rs, { signal }, common.mustCall((err) => {
102+
assert.strictEqual(err.name, 'AbortError');
103+
}));
104+
}
105+
106+
{
107+
// Check cancelled before the stream ends sync.
108+
const ac = new AbortController();
109+
const { signal } = ac;
110+
111+
const rs = Readable.from((function* () {})());
112+
finished(rs, { signal }, common.mustCall((err) => {
113+
assert.strictEqual(err.name, 'AbortError');
114+
}));
115+
116+
ac.abort();
117+
}
118+
119+
{
120+
// Check cancelled before the stream ends async.
121+
const ac = new AbortController();
122+
const { signal } = ac;
123+
124+
const rs = Readable.from((function* () {})());
125+
setTimeout(() => ac.abort(), 1);
126+
finished(rs, { signal }, common.mustCall((err) => {
127+
assert.strictEqual(err.name, 'AbortError');
128+
}));
129+
}
130+
131+
{
132+
// Check cancelled after doesn't throw.
133+
const ac = new AbortController();
134+
const { signal } = ac;
135+
136+
const rs = Readable.from((function* () {
137+
yield 5;
138+
setImmediate(() => ac.abort());
139+
})());
140+
rs.resume();
141+
finished(rs, { signal }, common.mustSucceed());
142+
}
143+
144+
{
145+
// Promisified abort works
146+
const finishedPromise = promisify(finished);
147+
async function run() {
148+
const ac = new AbortController();
149+
const { signal } = ac;
150+
const rs = Readable.from((function* () {})());
151+
setImmediate(() => ac.abort());
152+
await finishedPromise(rs, { signal });
153+
}
154+
155+
assert.rejects(run, { name: 'AbortError' }).then(common.mustCall());
156+
}
157+
158+
{
159+
// Promisified pre-aborted works
160+
const finishedPromise = promisify(finished);
161+
async function run() {
162+
const signal = new EventTarget();
163+
signal.aborted = true;
164+
const rs = Readable.from((function* () {})());
165+
await finishedPromise(rs, { signal });
166+
}
167+
168+
assert.rejects(run, { name: 'AbortError' }).then(common.mustCall());
169+
}
170+
171+
95172
{
96173
const rs = fs.createReadStream('file-does-not-exist');
97174

0 commit comments

Comments
 (0)