Skip to content

Commit ca855be

Browse files
committed
stream: allow transfer of readable byte streams
Updates the `ReadableStream` constructor to mark byte streams as transferable. When transferred, byte streams become regular streams. Refs: nodejs#39062 Refs: https://streams.spec.whatwg.org/#rs-transfer
1 parent 28fe494 commit ca855be

File tree

2 files changed

+63
-9
lines changed

2 files changed

+63
-9
lines changed

lib/internal/webstreams/readablestream.js

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -251,17 +251,16 @@ class ReadableStream {
251251
this,
252252
source,
253253
extractHighWaterMark(highWaterMark, 0));
254-
return;
254+
} else {
255+
if (type !== undefined)
256+
throw new ERR_INVALID_ARG_VALUE('source.type', type);
257+
setupReadableStreamDefaultControllerFromSource(
258+
this,
259+
source,
260+
extractHighWaterMark(highWaterMark, 1),
261+
extractSizeAlgorithm(size));
255262
}
256263

257-
if (type !== undefined)
258-
throw new ERR_INVALID_ARG_VALUE('source.type', type);
259-
setupReadableStreamDefaultControllerFromSource(
260-
this,
261-
source,
262-
extractHighWaterMark(highWaterMark, 1),
263-
extractSizeAlgorithm(size));
264-
265264
// eslint-disable-next-line no-constructor-return
266265
return makeTransferable(this);
267266
}

test/parallel/test-whatwg-webstreams-transfer.js

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ const {
1515

1616
const {
1717
isReadableStream,
18+
isReadableByteStreamController,
1819
} = require('internal/webstreams/readablestream');
1920

2021
const {
@@ -25,6 +26,10 @@ const {
2526
isTransformStream,
2627
} = require('internal/webstreams/transformstream');
2728

29+
const {
30+
kState,
31+
} = require('internal/webstreams/util');
32+
2833
const {
2934
makeTransferable,
3035
kClone,
@@ -107,6 +112,56 @@ const theData = 'hello';
107112
assert(readable.locked);
108113
}
109114

115+
{
116+
const { port1, port2 } = new MessageChannel();
117+
port1.onmessageerror = common.mustNotCall();
118+
port2.onmessageerror = common.mustNotCall();
119+
120+
// This test repeats the test above, but with a readable byte stream.
121+
// Note transferring a readable byte stream results in a regular
122+
// value-oriented stream on the other side:
123+
// https://streams.spec.whatwg.org/#abstract-opdef-setupcrossrealmtransformwritable
124+
125+
const theByteData = new Uint8Array([1, 2, 3]);
126+
127+
const readable = new ReadableStream({
128+
type: "bytes",
129+
start: common.mustCall((controller) => {
130+
// `enqueue` will detach its argument's buffer, so clone first
131+
controller.enqueue(theByteData.slice());
132+
controller.close();
133+
}),
134+
});
135+
assert(isReadableByteStreamController(readable[kState].controller));
136+
137+
port2.onmessage = common.mustCall(({ data }) => {
138+
assert(isReadableStream(data));
139+
assert(!isReadableByteStreamController(data[kState].controller));
140+
141+
const reader = data.getReader();
142+
reader.read().then(common.mustCall((chunk) => {
143+
assert.deepStrictEqual(chunk, { done: false, value: theByteData });
144+
}));
145+
146+
port2.close();
147+
});
148+
149+
port1.onmessage = common.mustCall(({ data }) => {
150+
assert(isReadableStream(data));
151+
assert(!isReadableByteStreamController(data[kState].controller));
152+
assert(!data.locked);
153+
port1.postMessage(data, [data]);
154+
assert(data.locked);
155+
});
156+
157+
assert.throws(() => port2.postMessage(readable), {
158+
code: 'ERR_MISSING_TRANSFERABLE_IN_TRANSFER_LIST',
159+
});
160+
161+
port2.postMessage(readable, [readable]);
162+
assert(readable.locked);
163+
}
164+
110165
{
111166
const { port1, port2 } = new MessageChannel();
112167
port1.onmessageerror = common.mustNotCall();

0 commit comments

Comments
 (0)