Skip to content

Commit b330729

Browse files
author
Nitzan Uziely
committed
stream: add AbortSignal support to finished
Add AbortSignal support to stream.finished
1 parent 88d9268 commit b330729

File tree

3 files changed

+107
-1
lines changed

3 files changed

+107
-1
lines changed

doc/api/stream.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1604,6 +1604,9 @@ changes:
16041604
* `writable` {boolean} When set to `false`, the callback will be called when
16051605
the stream ends even though the stream might still be writable.
16061606
**Default**: `true`.
1607+
* `signal` {AbortSignal} allows aborting the wait for the stream finish. The
1608+
underlying stream will *not* be aborted if the signal is aborted. The
1609+
callback will get called with an `AbortError`.
16071610
* `callback` {Function} A callback function that takes an optional error
16081611
argument.
16091612
* Returns: {Function} A cleanup function which removes all registered

lib/internal/streams/end-of-stream.js

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,18 @@ const {
77
FunctionPrototype,
88
FunctionPrototypeCall,
99
} = primordials;
10+
const {
11+
codes,
12+
AbortError,
13+
} = require('internal/errors');
1014
const {
1115
ERR_STREAM_PREMATURE_CLOSE
12-
} = require('internal/errors').codes;
16+
} = codes;
1317
const { once } = require('internal/util');
1418
const {
1519
validateFunction,
1620
validateObject,
21+
validateAbortSignal,
1722
} = require('internal/validators');
1823

1924
function isSocket(stream) {
@@ -76,6 +81,7 @@ function eos(stream, options, callback) {
7681
validateObject(options, 'options');
7782
}
7883
validateFunction(callback, 'callback');
84+
validateAbortSignal(options.signal, 'options.signal');
7985

8086
callback = once(callback);
8187

@@ -199,6 +205,20 @@ function eos(stream, options, callback) {
199205
});
200206
}
201207

208+
if (options.signal && !closed) {
209+
const abort = () => callback(new AbortError());
210+
if (options.signal.aborted) {
211+
process.nextTick(abort);
212+
} else {
213+
const originalCallback = callback;
214+
callback = once((...args) => {
215+
options.signal.removeEventListener('abort', abort);
216+
originalCallback(...args);
217+
});
218+
options.signal.addEventListener('abort', abort);
219+
}
220+
}
221+
202222
return function() {
203223
callback = nop;
204224
stream.removeListener('aborted', onclose);

test/parallel/test-stream-finished.js

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,89 @@ const http = require('http');
9292
run();
9393
}
9494

95+
{
96+
// Check pre-cancelled
97+
const signal = new EventTarget();
98+
signal.aborted = true;
99+
100+
const rs = Readable.from((function* () {})());
101+
finished(rs, { signal }, common.mustCall((err) => {
102+
assert.strictEqual(err.name, 'AbortError');
103+
}));
104+
}
105+
106+
{
107+
// Check cancelled before the stream ends sync.
108+
const ac = new AbortController();
109+
const { signal } = ac;
110+
111+
const rs = Readable.from((function* () {})());
112+
finished(rs, { signal }, common.mustCall((err) => {
113+
assert.strictEqual(err.name, 'AbortError');
114+
}));
115+
116+
ac.abort();
117+
}
118+
119+
{
120+
// Check cancelled before the stream ends async.
121+
const ac = new AbortController();
122+
const { signal } = ac;
123+
124+
const rs = Readable.from((function* () {})());
125+
setTimeout(() => ac.abort(), 1);
126+
finished(rs, { signal }, common.mustCall((err) => {
127+
assert.strictEqual(err.name, 'AbortError');
128+
}));
129+
}
130+
131+
{
132+
// Check cancelled after doesn't throw.
133+
const ac = new AbortController();
134+
const { signal } = ac;
135+
136+
const rs = Readable.from((function* () {
137+
yield 5;
138+
setImmediate(() => ac.abort());
139+
})());
140+
rs.resume();
141+
finished(rs, { signal }, common.mustCall((err) => {
142+
assert.strictEqual(err, undefined);
143+
}));
144+
}
145+
146+
{
147+
// Promisified abort works
148+
const finishedPromise = promisify(finished);
149+
async function run() {
150+
const ac = new AbortController();
151+
const { signal } = ac;
152+
const rs = Readable.from((function* () {})());
153+
setImmediate(() => ac.abort());
154+
await finishedPromise(rs, { signal });
155+
}
156+
157+
run().catch(common.mustCall((err) => {
158+
assert.strictEqual(err.name, 'AbortError');
159+
}));
160+
}
161+
162+
{
163+
// Promisified pre-aborted works
164+
const finishedPromise = promisify(finished);
165+
async function run() {
166+
const signal = new EventTarget();
167+
signal.aborted = true;
168+
const rs = Readable.from((function* () {})());
169+
await finishedPromise(rs, { signal });
170+
}
171+
172+
run().catch(common.mustCall((err) => {
173+
assert.strictEqual(err.name, 'AbortError');
174+
}));
175+
}
176+
177+
95178
{
96179
const rs = fs.createReadStream('file-does-not-exist');
97180

0 commit comments

Comments
 (0)