Skip to content

Commit 102c2f0

Browse files
committed
stream: always defer 'readable' with nextTick
Fixes: #3203
1 parent f89ee06 commit 102c2f0

8 files changed

+118
-57
lines changed

lib/_stream_readable.js

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -508,16 +508,15 @@ function emitReadable(stream) {
508508
if (!state.emittedReadable) {
509509
debug('emitReadable', state.flowing);
510510
state.emittedReadable = true;
511-
if (state.sync)
512-
process.nextTick(emitReadable_, stream);
513-
else
514-
emitReadable_(stream);
511+
process.nextTick(emitReadable_, stream);
515512
}
516513
}
517514

518515
function emitReadable_(stream) {
516+
var state = stream._readableState;
519517
debug('emit readable');
520518
stream.emit('readable');
519+
state.needReadable = !state.flowing && !state.ended;
521520
flow(stream);
522521
}
523522

@@ -644,6 +643,7 @@ Readable.prototype.pipe = function(dest, pipeOpts) {
644643
debug('ondata');
645644
increasedAwaitDrain = false;
646645
var ret = dest.write(chunk);
646+
debug('dest.write', ret);
647647
if (false === ret && !increasedAwaitDrain) {
648648
// If the user unpiped during `dest.write()`, it is possible
649649
// to get stuck in a permanently paused state if that write
@@ -824,8 +824,8 @@ function resume(stream, state) {
824824
}
825825

826826
function resume_(stream, state) {
827+
debug('resume', state.reading);
827828
if (!state.reading) {
828-
debug('resume read 0');
829829
stream.read(0);
830830
}
831831

test/parallel/test-net-end-close.js

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,18 +8,22 @@ const uv = process.binding('uv');
88
const s = new net.Socket({
99
handle: {
1010
readStart: function() {
11-
process.nextTick(() => this.onread(uv.UV_EOF, null));
11+
setImmediate(() => this.onread(uv.UV_EOF, null));
1212
},
13-
close: (cb) => process.nextTick(cb)
13+
close: (cb) => setImmediate(cb)
1414
},
1515
writable: false
1616
});
1717
assert.strictEqual(s, s.resume());
1818

1919
const events = [];
2020

21-
s.on('end', () => events.push('end'));
22-
s.on('close', () => events.push('close'));
21+
s.on('end', () => {
22+
events.push('end');
23+
});
24+
s.on('close', () => {
25+
events.push('close');
26+
});
2327

2428
process.on('exit', () => {
2529
assert.deepStrictEqual(events, [ 'end', 'close' ]);

test/parallel/test-stream-pipe-await-drain-push-while-write.js

Lines changed: 12 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -3,32 +3,24 @@ const common = require('../common');
33
const stream = require('stream');
44
const assert = require('assert');
55

6-
const awaitDrainStates = [
7-
1, // after first chunk before callback
8-
1, // after second chunk before callback
9-
0 // resolving chunk pushed after first chunk, awaitDrain is decreased
10-
];
11-
12-
// A writable stream which pushes data onto the stream which pipes into it,
13-
// but only the first time it's written to. Since it's not paused at this time,
14-
// a second write will occur. If the pipe increases awaitDrain twice, we'll
15-
// never get subsequent chunks because 'drain' is only emitted once.
166
const writable = new stream.Writable({
177
write: common.mustCall(function(chunk, encoding, cb) {
18-
if (chunk.length === 32 * 1024) { // first chunk
19-
const beforePush = readable._readableState.awaitDrain;
20-
readable.push(Buffer.alloc(34 * 1024)); // above hwm
21-
// We should check if awaitDrain counter is increased.
22-
const afterPush = readable._readableState.awaitDrain;
23-
assert.strictEqual(afterPush - beforePush, 1,
24-
'Counter is not increased for awaitDrain');
25-
}
26-
278
assert.strictEqual(
28-
awaitDrainStates.shift(),
299
readable._readableState.awaitDrain,
10+
0,
3011
'State variable awaitDrain is not correct.'
3112
);
13+
14+
if (chunk.length === 32 * 1024) { // first chunk
15+
readable.push(Buffer.alloc(34 * 1024)); // above hwm
16+
// We should check if awaitDrain counter is increased in the next
17+
// tick, because awaitDrain is incremented after this method finished
18+
process.nextTick(() => {
19+
assert.strictEqual(readable._readableState.awaitDrain, 1,
20+
'Counter is not increased for awaitDrain');
21+
});
22+
}
23+
3224
cb();
3325
}, 3)
3426
});

test/parallel/test-stream-readable-emittedReadable.js

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -10,30 +10,33 @@ const readable = new Readable({
1010
// Initialized to false.
1111
assert.strictEqual(readable._readableState.emittedReadable, false);
1212

13+
const expected = [Buffer.from('foobar'), Buffer.from('quo'), null];
1314
readable.on('readable', common.mustCall(() => {
1415
// emittedReadable should be true when the readable event is emitted
1516
assert.strictEqual(readable._readableState.emittedReadable, true);
16-
readable.read();
17+
assert.deepStrictEqual(readable.read(), expected.shift());
1718
// emittedReadable is reset to false during read()
1819
assert.strictEqual(readable._readableState.emittedReadable, false);
19-
}, 4));
20+
}, 3));
2021

2122
// When the first readable listener is just attached,
2223
// emittedReadable should be false
2324
assert.strictEqual(readable._readableState.emittedReadable, false);
2425

25-
// Each one of these should trigger a readable event.
26+
// These trigger a single 'readable', as things are batched up
2627
process.nextTick(common.mustCall(() => {
2728
readable.push('foo');
2829
}));
2930
process.nextTick(common.mustCall(() => {
3031
readable.push('bar');
3132
}));
32-
process.nextTick(common.mustCall(() => {
33+
34+
// these triggers two readable events
35+
setImmediate(common.mustCall(() => {
3336
readable.push('quo');
34-
}));
35-
process.nextTick(common.mustCall(() => {
36-
readable.push(null);
37+
process.nextTick(common.mustCall(() => {
38+
readable.push(null);
39+
}));
3740
}));
3841

3942
const noRead = new Readable({

test/parallel/test-stream-readable-needReadable.js

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -38,16 +38,17 @@ asyncReadable.on('readable', common.mustCall(() => {
3838
// then we need to notify the reader on future changes.
3939
assert.strictEqual(asyncReadable._readableState.needReadable, true);
4040
}
41-
}, 3));
41+
}, 2));
4242

4343
process.nextTick(common.mustCall(() => {
4444
asyncReadable.push('foooo');
4545
}));
4646
process.nextTick(common.mustCall(() => {
4747
asyncReadable.push('bar');
4848
}));
49-
process.nextTick(common.mustCall(() => {
49+
setImmediate(common.mustCall(() => {
5050
asyncReadable.push(null);
51+
assert.strictEqual(asyncReadable._readableState.needReadable, false);
5152
}));
5253

5354
const flowing = new Readable({
@@ -84,13 +85,13 @@ slowProducer.on('readable', common.mustCall(() => {
8485

8586
process.nextTick(common.mustCall(() => {
8687
slowProducer.push('foo');
87-
}));
88-
process.nextTick(common.mustCall(() => {
89-
slowProducer.push('foo');
90-
}));
91-
process.nextTick(common.mustCall(() => {
92-
slowProducer.push('foo');
93-
}));
94-
process.nextTick(common.mustCall(() => {
95-
slowProducer.push(null);
88+
process.nextTick(common.mustCall(() => {
89+
slowProducer.push('foo');
90+
process.nextTick(common.mustCall(() => {
91+
slowProducer.push('foo');
92+
process.nextTick(common.mustCall(() => {
93+
slowProducer.push(null);
94+
}));
95+
}));
96+
}));
9697
}));
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
'use strict';
2+
3+
const common = require('../common');
4+
const assert = require('assert');
5+
const { Readable } = require('stream');
6+
7+
const MAX = 42;
8+
const BATCH = 10;
9+
10+
const readable = new Readable({
11+
objectMode: true,
12+
read: common.mustCall(function() {
13+
console.log('>> READ');
14+
fetchData((err, data) => {
15+
if (err) {
16+
this.destroy(err);
17+
return;
18+
}
19+
20+
if (data.length === 0) {
21+
console.log('pushing null');
22+
this.push(null);
23+
return;
24+
}
25+
26+
console.log('pushing');
27+
data.forEach((d) => this.push(d));
28+
});
29+
}, Math.floor(MAX / BATCH) + 2)
30+
});
31+
32+
let i = 0;
33+
function fetchData(cb) {
34+
if (i > MAX) {
35+
setTimeout(cb, 10, null, []);
36+
} else {
37+
const array = [];
38+
const max = i + BATCH;
39+
for (; i < max; i++) {
40+
array.push(i);
41+
}
42+
setTimeout(cb, 10, null, array);
43+
}
44+
}
45+
46+
readable.on('readable', () => {
47+
let data;
48+
console.log('readable emitted');
49+
while (data = readable.read()) {
50+
console.log(data);
51+
}
52+
});
53+
54+
readable.on('end', () => {
55+
assert.strictEqual(i, (Math.floor(MAX / BATCH) + 1) * BATCH);
56+
});

test/parallel/test-stream-readable-reading-readingMore.js

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,15 +29,19 @@ function onStreamEnd() {
2929
assert.strictEqual(state.reading, false);
3030
}
3131

32+
const expected = [
33+
true, // stream is not ended
34+
false // stream is ended
35+
];
36+
3237
readable.on('readable', common.mustCall(() => {
33-
// 'readable' always gets called before 'end'
34-
// since 'end' hasn't been emitted, more data could be incoming
35-
assert.strictEqual(state.readingMore, true);
38+
assert.strictEqual(state.readingMore, expected.shift());
3639

3740
// if the stream has ended, we shouldn't be reading
3841
assert.strictEqual(state.ended, !state.reading);
3942

40-
if (readable.read() === null) // reached end of stream
43+
const data = readable.read();
44+
if (data === null) // reached end of stream
4145
process.nextTick(common.mustCall(onStreamEnd, 1));
4246
}, 2));
4347

test/parallel/test-stream2-transform.js

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -306,25 +306,26 @@ const Transform = require('_stream_transform');
306306
pt.write(Buffer.from('foog'));
307307
pt.write(Buffer.from('bark'));
308308

309-
assert.strictEqual(emits, 1);
309+
assert.strictEqual(emits, 0);
310310
assert.strictEqual(pt.read(5).toString(), 'foogb');
311311
assert.strictEqual(String(pt.read(5)), 'null');
312+
assert.strictEqual(emits, 0);
312313

313314
pt.write(Buffer.from('bazy'));
314315
pt.write(Buffer.from('kuel'));
315316

316-
assert.strictEqual(emits, 2);
317+
assert.strictEqual(emits, 0);
317318
assert.strictEqual(pt.read(5).toString(), 'arkba');
318319
assert.strictEqual(pt.read(5).toString(), 'zykue');
319320
assert.strictEqual(pt.read(5), null);
320321

321322
pt.end();
322323

323-
assert.strictEqual(emits, 3);
324+
assert.strictEqual(emits, 0);
324325
assert.strictEqual(pt.read(5).toString(), 'l');
325326
assert.strictEqual(pt.read(5), null);
326327

327-
assert.strictEqual(emits, 3);
328+
assert.strictEqual(emits, 0);
328329
}
329330

330331
{
@@ -338,7 +339,7 @@ const Transform = require('_stream_transform');
338339
pt.write(Buffer.from('foog'));
339340
pt.write(Buffer.from('bark'));
340341

341-
assert.strictEqual(emits, 1);
342+
assert.strictEqual(emits, 0);
342343
assert.strictEqual(pt.read(5).toString(), 'foogb');
343344
assert.strictEqual(pt.read(5), null);
344345

@@ -352,7 +353,7 @@ const Transform = require('_stream_transform');
352353
pt.once('readable', common.mustCall(function() {
353354
assert.strictEqual(pt.read(5).toString(), 'l');
354355
assert.strictEqual(pt.read(5), null);
355-
assert.strictEqual(emits, 4);
356+
assert.strictEqual(emits, 3);
356357
}));
357358
pt.end();
358359
}));

0 commit comments

Comments
 (0)