-
-
Notifications
You must be signed in to change notification settings - Fork 31.9k
events: add EventEmitter.on to async iterate over events #27994
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
fb4052a
0a1b52d
31bb823
c795045
7344bf7
c8b16cf
4542f50
10821a2
b71f016
0c38799
0e9b7fa
f1babd5
2e9ffee
346fa56
0e518be
c7cda01
35939a5
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -29,12 +29,16 @@ const { | |
ObjectCreate, | ||
ObjectDefineProperty, | ||
ObjectGetPrototypeOf, | ||
ObjectSetPrototypeOf, | ||
ObjectKeys, | ||
Promise, | ||
mcollina marked this conversation as resolved.
Show resolved
Hide resolved
|
||
PromiseReject, | ||
PromiseResolve, | ||
ReflectApply, | ||
ReflectOwnKeys, | ||
Symbol, | ||
SymbolFor, | ||
SymbolAsyncIterator | ||
} = primordials; | ||
const kRejection = SymbolFor('nodejs.rejection'); | ||
|
||
|
@@ -62,6 +66,7 @@ function EventEmitter(opts) { | |
} | ||
module.exports = EventEmitter; | ||
module.exports.once = once; | ||
module.exports.on = on; | ||
|
||
// Backwards-compat with node 0.10.x | ||
EventEmitter.EventEmitter = EventEmitter; | ||
|
@@ -657,3 +662,102 @@ function once(emitter, name) { | |
emitter.once(name, eventListener); | ||
}); | ||
} | ||
|
||
const AsyncIteratorPrototype = ObjectGetPrototypeOf( | ||
ObjectGetPrototypeOf(async function* () {}).prototype); | ||
|
||
function createIterResult(value, done) { | ||
return { value, done }; | ||
} | ||
|
||
function on(emitter, event) { | ||
const unconsumedEvents = []; | ||
const unconsumedPromises = []; | ||
let error = null; | ||
let finished = false; | ||
|
||
const iterator = ObjectSetPrototypeOf({ | ||
next() { | ||
// First, we consume all unread events | ||
const value = unconsumedEvents.shift(); | ||
if (value) { | ||
return PromiseResolve(createIterResult(value, false)); | ||
} | ||
|
||
// Then we error, if an error happened | ||
// This happens one time if at all, because after 'error' | ||
// we stop listening | ||
if (error) { | ||
const p = PromiseReject(error); | ||
// Only the first element errors | ||
error = null; | ||
return p; | ||
} | ||
|
||
// If the iterator is finished, resolve to done | ||
if (finished) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Stupid question: how does |
||
return PromiseResolve(createIterResult(undefined, true)); | ||
} | ||
|
||
// Wait until an event happens | ||
return new Promise(function(resolve, reject) { | ||
unconsumedPromises.push({ resolve, reject }); | ||
}); | ||
}, | ||
|
||
return() { | ||
emitter.removeListener(event, eventHandler); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. it's possible for there to be pending |
||
emitter.removeListener('error', errorHandler); | ||
finished = true; | ||
|
||
for (const promise of unconsumedPromises) { | ||
mcollina marked this conversation as resolved.
Show resolved
Hide resolved
|
||
promise.resolve(createIterResult(undefined, true)); | ||
} | ||
|
||
return PromiseResolve(createIterResult(undefined, true)); | ||
}, | ||
|
||
throw(err) { | ||
if (!err || !(err instanceof Error)) { | ||
throw new ERR_INVALID_ARG_TYPE('EventEmitter.AsyncIterator', | ||
'Error', err); | ||
} | ||
error = err; | ||
emitter.removeListener(event, eventHandler); | ||
emitter.removeListener('error', errorHandler); | ||
}, | ||
|
||
[SymbolAsyncIterator]() { | ||
return this; | ||
} | ||
}, AsyncIteratorPrototype); | ||
mcollina marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
emitter.on(event, eventHandler); | ||
emitter.on('error', errorHandler); | ||
|
||
return iterator; | ||
|
||
function eventHandler(...args) { | ||
const promise = unconsumedPromises.shift(); | ||
if (promise) { | ||
promise.resolve(createIterResult(args, false)); | ||
} else { | ||
unconsumedEvents.push(args); | ||
} | ||
} | ||
|
||
function errorHandler(err) { | ||
finished = true; | ||
|
||
const toError = unconsumedPromises.shift(); | ||
|
||
if (toError) { | ||
toError.reject(err); | ||
} else { | ||
// The next time we call next() | ||
error = err; | ||
} | ||
|
||
iterator.return(); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,223 @@ | ||
'use strict'; | ||
|
||
const common = require('../common'); | ||
const assert = require('assert'); | ||
const { on, EventEmitter } = require('events'); | ||
|
||
async function basic() { | ||
const ee = new EventEmitter(); | ||
process.nextTick(() => { | ||
ee.emit('foo', 'bar'); | ||
// 'bar' is a spurious event, we are testing | ||
// that it does not show up in the iterable | ||
ee.emit('bar', 24); | ||
ee.emit('foo', 42); | ||
mcollina marked this conversation as resolved.
Show resolved
Hide resolved
|
||
}); | ||
|
||
const iterable = on(ee, 'foo'); | ||
|
||
const expected = [['bar'], [42]]; | ||
|
||
for await (const event of iterable) { | ||
const current = expected.shift(); | ||
|
||
assert.deepStrictEqual(current, event); | ||
|
||
if (expected.length === 0) { | ||
break; | ||
} | ||
} | ||
assert.strictEqual(ee.listenerCount('foo'), 0); | ||
assert.strictEqual(ee.listenerCount('error'), 0); | ||
} | ||
|
||
async function error() { | ||
const ee = new EventEmitter(); | ||
const _err = new Error('kaboom'); | ||
process.nextTick(() => { | ||
ee.emit('error', _err); | ||
}); | ||
|
||
const iterable = on(ee, 'foo'); | ||
let looped = false; | ||
let thrown = false; | ||
|
||
try { | ||
// eslint-disable-next-line no-unused-vars | ||
for await (const event of iterable) { | ||
looped = true; | ||
} | ||
} catch (err) { | ||
thrown = true; | ||
assert.strictEqual(err, _err); | ||
} | ||
assert.strictEqual(thrown, true); | ||
assert.strictEqual(looped, false); | ||
} | ||
|
||
async function errorDelayed() { | ||
const ee = new EventEmitter(); | ||
const _err = new Error('kaboom'); | ||
process.nextTick(() => { | ||
ee.emit('foo', 42); | ||
ee.emit('error', _err); | ||
}); | ||
|
||
const iterable = on(ee, 'foo'); | ||
const expected = [[42]]; | ||
let thrown = false; | ||
|
||
try { | ||
for await (const event of iterable) { | ||
const current = expected.shift(); | ||
assert.deepStrictEqual(current, event); | ||
} | ||
} catch (err) { | ||
thrown = true; | ||
assert.strictEqual(err, _err); | ||
} | ||
assert.strictEqual(thrown, true); | ||
assert.strictEqual(ee.listenerCount('foo'), 0); | ||
assert.strictEqual(ee.listenerCount('error'), 0); | ||
} | ||
|
||
async function throwInLoop() { | ||
const ee = new EventEmitter(); | ||
const _err = new Error('kaboom'); | ||
|
||
process.nextTick(() => { | ||
ee.emit('foo', 42); | ||
}); | ||
|
||
try { | ||
for await (const event of on(ee, 'foo')) { | ||
assert.deepStrictEqual(event, [42]); | ||
throw _err; | ||
} | ||
} catch (err) { | ||
assert.strictEqual(err, _err); | ||
} | ||
|
||
assert.strictEqual(ee.listenerCount('foo'), 0); | ||
assert.strictEqual(ee.listenerCount('error'), 0); | ||
} | ||
|
||
async function next() { | ||
const ee = new EventEmitter(); | ||
const iterable = on(ee, 'foo'); | ||
|
||
process.nextTick(function() { | ||
ee.emit('foo', 'bar'); | ||
ee.emit('foo', 42); | ||
iterable.return(); | ||
}); | ||
|
||
const results = await Promise.all([ | ||
iterable.next(), | ||
iterable.next(), | ||
iterable.next() | ||
mcollina marked this conversation as resolved.
Show resolved
Hide resolved
|
||
]); | ||
|
||
assert.deepStrictEqual(results, [{ | ||
value: ['bar'], | ||
done: false | ||
}, { | ||
value: [42], | ||
done: false | ||
}, { | ||
value: undefined, | ||
done: true | ||
}]); | ||
|
||
assert.deepStrictEqual(await iterable.next(), { | ||
value: undefined, | ||
done: true | ||
}); | ||
} | ||
|
||
async function nextError() { | ||
const ee = new EventEmitter(); | ||
const iterable = on(ee, 'foo'); | ||
const _err = new Error('kaboom'); | ||
process.nextTick(function() { | ||
ee.emit('error', _err); | ||
}); | ||
const results = await Promise.allSettled([ | ||
iterable.next(), | ||
iterable.next(), | ||
iterable.next() | ||
]); | ||
assert.deepStrictEqual(results, [{ | ||
status: 'rejected', | ||
reason: _err | ||
}, { | ||
status: 'fulfilled', | ||
value: { | ||
value: undefined, | ||
done: true | ||
} | ||
}, { | ||
status: 'fulfilled', | ||
value: { | ||
value: undefined, | ||
done: true | ||
} | ||
}]); | ||
mcollina marked this conversation as resolved.
Show resolved
Hide resolved
|
||
assert.strictEqual(ee.listeners('error').length, 0); | ||
} | ||
|
||
async function iterableThrow() { | ||
const ee = new EventEmitter(); | ||
const iterable = on(ee, 'foo'); | ||
|
||
process.nextTick(() => { | ||
ee.emit('foo', 'bar'); | ||
ee.emit('foo', 42); // lost in the queue | ||
iterable.throw(_err); | ||
}); | ||
|
||
const _err = new Error('kaboom'); | ||
let thrown = false; | ||
|
||
assert.throws(() => { | ||
// No argument | ||
iterable.throw(); | ||
}, { | ||
message: 'The "EventEmitter.AsyncIterator" property must be' + | ||
' an instance of Error. Received undefined', | ||
name: 'TypeError' | ||
}); | ||
|
||
const expected = [['bar'], [42]]; | ||
|
||
try { | ||
for await (const event of iterable) { | ||
assert.deepStrictEqual(event, expected.shift()); | ||
} | ||
} catch (err) { | ||
thrown = true; | ||
assert.strictEqual(err, _err); | ||
} | ||
assert.strictEqual(thrown, true); | ||
assert.strictEqual(expected.length, 0); | ||
assert.strictEqual(ee.listenerCount('foo'), 0); | ||
assert.strictEqual(ee.listenerCount('error'), 0); | ||
} | ||
|
||
async function run() { | ||
mcollina marked this conversation as resolved.
Show resolved
Hide resolved
|
||
const funcs = [ | ||
basic, | ||
error, | ||
errorDelayed, | ||
throwInLoop, | ||
next, | ||
nextError, | ||
iterableThrow | ||
]; | ||
|
||
for (const fn of funcs) { | ||
await fn(); | ||
} | ||
} | ||
|
||
run().then(common.mustCall()); |
Uh oh!
There was an error while loading. Please reload this page.