Skip to content

Commit 63aad0a

Browse files
committed
stream: support dispose in writable
Add support to Symbol.asyncDispose in writable streams. Additionally add a test for writable, transform and duplex streams who inherit from readable/writable to avoid breakage.
1 parent 71d7707 commit 63aad0a

File tree

5 files changed

+77
-14
lines changed

5 files changed

+77
-14
lines changed

doc/api/stream.md

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -921,6 +921,17 @@ added: v12.3.0
921921

922922
Getter for the property `objectMode` of a given `Writable` stream.
923923

924+
##### `writable[Symbol.asyncDispose]()`
925+
926+
<!-- YAML
927+
added: REPLACEME
928+
-->
929+
930+
> Stability: 1 - Experimental
931+
932+
Calls [`writable.destroy()`][writable-destroy] with an `AbortError` and returns
933+
a promise that fulfills when the stream is finished.
934+
924935
##### `writable.write(chunk[, encoding][, callback])`
925936

926937
<!-- YAML

lib/internal/streams/writable.js

Lines changed: 29 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,10 @@ const {
3232
ObjectDefineProperty,
3333
ObjectDefineProperties,
3434
ObjectSetPrototypeOf,
35+
Promise,
3536
StringPrototypeToLowerCase,
3637
Symbol,
38+
SymbolAsyncDispose,
3739
SymbolHasInstance,
3840
} = primordials;
3941

@@ -44,6 +46,7 @@ const EE = require('events');
4446
const Stream = require('internal/streams/legacy').Stream;
4547
const { Buffer } = require('buffer');
4648
const destroyImpl = require('internal/streams/destroy');
49+
const eos = require('internal/streams/end-of-stream');
4750

4851
const {
4952
addAbortSignal,
@@ -54,16 +57,19 @@ const {
5457
getDefaultHighWaterMark,
5558
} = require('internal/streams/state');
5659
const {
57-
ERR_INVALID_ARG_TYPE,
58-
ERR_METHOD_NOT_IMPLEMENTED,
59-
ERR_MULTIPLE_CALLBACK,
60-
ERR_STREAM_CANNOT_PIPE,
61-
ERR_STREAM_DESTROYED,
62-
ERR_STREAM_ALREADY_FINISHED,
63-
ERR_STREAM_NULL_VALUES,
64-
ERR_STREAM_WRITE_AFTER_END,
65-
ERR_UNKNOWN_ENCODING,
66-
} = require('internal/errors').codes;
60+
AbortError,
61+
codes: {
62+
ERR_INVALID_ARG_TYPE,
63+
ERR_METHOD_NOT_IMPLEMENTED,
64+
ERR_MULTIPLE_CALLBACK,
65+
ERR_STREAM_CANNOT_PIPE,
66+
ERR_STREAM_DESTROYED,
67+
ERR_STREAM_ALREADY_FINISHED,
68+
ERR_STREAM_NULL_VALUES,
69+
ERR_STREAM_WRITE_AFTER_END,
70+
ERR_UNKNOWN_ENCODING,
71+
},
72+
} = require('internal/errors');
6773

6874
const { errorOrDestroy } = destroyImpl;
6975

@@ -477,7 +483,7 @@ function onwrite(stream, er) {
477483
// rather just increase a counter, to improve performance and avoid
478484
// memory allocations.
479485
if (state.afterWriteTickInfo !== null &&
480-
state.afterWriteTickInfo.cb === cb) {
486+
state.afterWriteTickInfo.cb === cb) {
481487
state.afterWriteTickInfo.count++;
482488
} else {
483489
state.afterWriteTickInfo = { count: 1, cb, stream, state };
@@ -538,9 +544,9 @@ function errorBuffer(state) {
538544
// If there's something in the buffer waiting, then process it.
539545
function clearBuffer(stream, state) {
540546
if (state.corked ||
541-
state.bufferProcessing ||
542-
state.destroyed ||
543-
!state.constructed) {
547+
state.bufferProcessing ||
548+
state.destroyed ||
549+
!state.constructed) {
544550
return;
545551
}
546552

@@ -934,3 +940,12 @@ Writable.fromWeb = function(writableStream, options) {
934940
Writable.toWeb = function(streamWritable) {
935941
return lazyWebStreams().newWritableStreamFromStreamWritable(streamWritable);
936942
};
943+
944+
Writable.prototype[SymbolAsyncDispose] = function() {
945+
let error;
946+
if (!this.destroyed) {
947+
error = this.readableEnded ? null : new AbortError();
948+
this.destroy(error);
949+
}
950+
return new Promise((resolve, reject) => eos(this, (err) => (err && err !== error ? reject(err) : resolve(null))));
951+
};

test/parallel/test-stream-duplex-destroy.js

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -255,3 +255,17 @@ const assert = require('assert');
255255
duplex.on('close', common.mustCall());
256256
controller.abort();
257257
}
258+
{
259+
// Check Symbol.asyncDispose
260+
const duplex = new Duplex({
261+
write(chunk, enc, cb) { cb(); },
262+
read() {},
263+
});
264+
let count = 0;
265+
duplex.on('error', common.mustCall((e) => {
266+
assert.strictEqual(count++, 0); // Ensure not called twice
267+
assert.strictEqual(e.name, 'AbortError');
268+
}));
269+
duplex.on('close', common.mustCall());
270+
duplex[Symbol.asyncDispose]().then(common.mustCall());
271+
}

test/parallel/test-stream-transform-destroy.js

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -141,3 +141,14 @@ const assert = require('assert');
141141

142142
transform.destroy();
143143
}
144+
145+
{
146+
const transform = new Transform({
147+
transform(chunk, enc, cb) {}
148+
});
149+
transform.on('error', common.mustCall((err) => {
150+
assert.strictEqual(err.name, 'AbortError');
151+
}));
152+
transform.on('clocse', common.mustCall());
153+
transform[Symbol.asyncDispose]().then(common.mustCall());
154+
}

test/parallel/test-stream-writable-destroy.js

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -487,3 +487,15 @@ const assert = require('assert');
487487
}));
488488
s.destroy(_err);
489489
}
490+
491+
{
492+
const write = new Writable({
493+
write(chunk, enc, cb) { cb(); }
494+
});
495+
496+
write.on('error', common.mustCall((e) => {
497+
assert.strictEqual(e.name, 'AbortError');
498+
assert.strictEqual(write.destroyed, true);
499+
}));
500+
write[Symbol.asyncDispose]().then(common.mustCall());
501+
}

0 commit comments

Comments
 (0)