From 38a593b0f3bc4fa52ed9216d75a98bbf7ab5bd9e Mon Sep 17 00:00:00 2001 From: Matteo Collina Date: Thu, 30 May 2019 17:58:55 +0200 Subject: [PATCH] events: add EventEmitter.on to async iterate over events Fixes: https://github.com/nodejs/node/issues/27847 PR-URL: https://github.com/nodejs/node/pull/27994 Reviewed-By: Benjamin Gruenbaum Reviewed-By: James M Snell Reviewed-By: Gus Caplan Reviewed-By: Anna Henningsen Reviewed-By: Rich Trott --- doc/api/events.md | 35 +++ lib/events.js | 104 ++++++++ test/parallel/test-event-on-async-iterator.js | 223 ++++++++++++++++++ 3 files changed, 362 insertions(+) create mode 100644 test/parallel/test-event-on-async-iterator.js diff --git a/doc/api/events.md b/doc/api/events.md index 69f309a73bcd34..1e2a7660cb901b 100644 --- a/doc/api/events.md +++ b/doc/api/events.md @@ -886,6 +886,41 @@ Value: `Symbol.for('nodejs.rejection')` See how to write a custom [rejection handler][rejection]. +## events.on(emitter, eventName) + + +* `emitter` {EventEmitter} +* `eventName` {string|symbol} The name of the event being listened for +* Returns: {AsyncIterator} that iterates `eventName` events emitted by the `emitter` + +```js +const { on, EventEmitter } = require('events'); + +(async () => { + const ee = new EventEmitter(); + + // Emit later on + process.nextTick(() => { + ee.emit('foo', 'bar'); + ee.emit('foo', 42); + }); + + for await (const event of on(ee, 'foo')) { + // The execution of this inner block is synchronous and it + // processes one event at a time (even with await). Do not use + // if concurrent execution is required. + console.log(event); // prints ['bar'] [42] + } +})(); +``` + +Returns an `AsyncIterator` that iterates `eventName` events. It will throw +if the `EventEmitter` emits `'error'`. It removes all listeners when +exiting the loop. The `value` returned by each iteration is an array +composed of the emitted event arguments. + [WHATWG-EventTarget]: https://dom.spec.whatwg.org/#interface-eventtarget [`--trace-warnings`]: cli.html#cli_trace_warnings [`EventEmitter.defaultMaxListeners`]: #events_eventemitter_defaultmaxlisteners diff --git a/lib/events.js b/lib/events.js index 6f3739c67f2a8f..7889f62d9094f3 100644 --- a/lib/events.js +++ b/lib/events.js @@ -29,12 +29,16 @@ const { ObjectCreate, ObjectDefineProperty, ObjectGetPrototypeOf, + ObjectSetPrototypeOf, ObjectKeys, Promise, + PromiseReject, + PromiseResolve, ReflectApply, ReflectOwnKeys, Symbol, SymbolFor, + SymbolAsyncIterator } = primordials; const kRejection = SymbolFor('nodejs.rejection'); @@ -62,6 +66,7 @@ function EventEmitter(opts) { } module.exports = EventEmitter; module.exports.once = once; +module.exports.on = on; // Backwards-compat with node 0.10.x EventEmitter.EventEmitter = EventEmitter; @@ -657,3 +662,102 @@ function once(emitter, name) { emitter.once(name, eventListener); }); } + +const AsyncIteratorPrototype = ObjectGetPrototypeOf( + ObjectGetPrototypeOf(async function* () {}).prototype); + +function createIterResult(value, done) { + return { value, done }; +} + +function on(emitter, event) { + const unconsumedEvents = []; + const unconsumedPromises = []; + let error = null; + let finished = false; + + const iterator = ObjectSetPrototypeOf({ + next() { + // First, we consume all unread events + const value = unconsumedEvents.shift(); + if (value) { + return PromiseResolve(createIterResult(value, false)); + } + + // Then we error, if an error happened + // This happens one time if at all, because after 'error' + // we stop listening + if (error) { + const p = PromiseReject(error); + // Only the first element errors + error = null; + return p; + } + + // If the iterator is finished, resolve to done + if (finished) { + return PromiseResolve(createIterResult(undefined, true)); + } + + // Wait until an event happens + return new Promise(function(resolve, reject) { + unconsumedPromises.push({ resolve, reject }); + }); + }, + + return() { + emitter.removeListener(event, eventHandler); + emitter.removeListener('error', errorHandler); + finished = true; + + for (const promise of unconsumedPromises) { + promise.resolve(createIterResult(undefined, true)); + } + + return PromiseResolve(createIterResult(undefined, true)); + }, + + throw(err) { + if (!err || !(err instanceof Error)) { + throw new ERR_INVALID_ARG_TYPE('EventEmitter.AsyncIterator', + 'Error', err); + } + error = err; + emitter.removeListener(event, eventHandler); + emitter.removeListener('error', errorHandler); + }, + + [SymbolAsyncIterator]() { + return this; + } + }, AsyncIteratorPrototype); + + emitter.on(event, eventHandler); + emitter.on('error', errorHandler); + + return iterator; + + function eventHandler(...args) { + const promise = unconsumedPromises.shift(); + if (promise) { + promise.resolve(createIterResult(args, false)); + } else { + unconsumedEvents.push(args); + } + } + + function errorHandler(err) { + finished = true; + + const toError = unconsumedPromises.shift(); + + if (toError) { + toError.reject(err); + } else { + // The next time we call next() + error = err; + } + + iterator.return(); + } +} diff --git a/test/parallel/test-event-on-async-iterator.js b/test/parallel/test-event-on-async-iterator.js new file mode 100644 index 00000000000000..ff5d8cdaf2aea0 --- /dev/null +++ b/test/parallel/test-event-on-async-iterator.js @@ -0,0 +1,223 @@ +'use strict'; + +const common = require('../common'); +const assert = require('assert'); +const { on, EventEmitter } = require('events'); + +async function basic() { + const ee = new EventEmitter(); + process.nextTick(() => { + ee.emit('foo', 'bar'); + // 'bar' is a spurious event, we are testing + // that it does not show up in the iterable + ee.emit('bar', 24); + ee.emit('foo', 42); + }); + + const iterable = on(ee, 'foo'); + + const expected = [['bar'], [42]]; + + for await (const event of iterable) { + const current = expected.shift(); + + assert.deepStrictEqual(current, event); + + if (expected.length === 0) { + break; + } + } + assert.strictEqual(ee.listenerCount('foo'), 0); + assert.strictEqual(ee.listenerCount('error'), 0); +} + +async function error() { + const ee = new EventEmitter(); + const _err = new Error('kaboom'); + process.nextTick(() => { + ee.emit('error', _err); + }); + + const iterable = on(ee, 'foo'); + let looped = false; + let thrown = false; + + try { + // eslint-disable-next-line no-unused-vars + for await (const event of iterable) { + looped = true; + } + } catch (err) { + thrown = true; + assert.strictEqual(err, _err); + } + assert.strictEqual(thrown, true); + assert.strictEqual(looped, false); +} + +async function errorDelayed() { + const ee = new EventEmitter(); + const _err = new Error('kaboom'); + process.nextTick(() => { + ee.emit('foo', 42); + ee.emit('error', _err); + }); + + const iterable = on(ee, 'foo'); + const expected = [[42]]; + let thrown = false; + + try { + for await (const event of iterable) { + const current = expected.shift(); + assert.deepStrictEqual(current, event); + } + } catch (err) { + thrown = true; + assert.strictEqual(err, _err); + } + assert.strictEqual(thrown, true); + assert.strictEqual(ee.listenerCount('foo'), 0); + assert.strictEqual(ee.listenerCount('error'), 0); +} + +async function throwInLoop() { + const ee = new EventEmitter(); + const _err = new Error('kaboom'); + + process.nextTick(() => { + ee.emit('foo', 42); + }); + + try { + for await (const event of on(ee, 'foo')) { + assert.deepStrictEqual(event, [42]); + throw _err; + } + } catch (err) { + assert.strictEqual(err, _err); + } + + assert.strictEqual(ee.listenerCount('foo'), 0); + assert.strictEqual(ee.listenerCount('error'), 0); +} + +async function next() { + const ee = new EventEmitter(); + const iterable = on(ee, 'foo'); + + process.nextTick(function() { + ee.emit('foo', 'bar'); + ee.emit('foo', 42); + iterable.return(); + }); + + const results = await Promise.all([ + iterable.next(), + iterable.next(), + iterable.next() + ]); + + assert.deepStrictEqual(results, [{ + value: ['bar'], + done: false + }, { + value: [42], + done: false + }, { + value: undefined, + done: true + }]); + + assert.deepStrictEqual(await iterable.next(), { + value: undefined, + done: true + }); +} + +async function nextError() { + const ee = new EventEmitter(); + const iterable = on(ee, 'foo'); + const _err = new Error('kaboom'); + process.nextTick(function() { + ee.emit('error', _err); + }); + const results = await Promise.allSettled([ + iterable.next(), + iterable.next(), + iterable.next() + ]); + assert.deepStrictEqual(results, [{ + status: 'rejected', + reason: _err + }, { + status: 'fulfilled', + value: { + value: undefined, + done: true + } + }, { + status: 'fulfilled', + value: { + value: undefined, + done: true + } + }]); + assert.strictEqual(ee.listeners('error').length, 0); +} + +async function iterableThrow() { + const ee = new EventEmitter(); + const iterable = on(ee, 'foo'); + + process.nextTick(() => { + ee.emit('foo', 'bar'); + ee.emit('foo', 42); // lost in the queue + iterable.throw(_err); + }); + + const _err = new Error('kaboom'); + let thrown = false; + + assert.throws(() => { + // No argument + iterable.throw(); + }, { + message: 'The "EventEmitter.AsyncIterator" property must be' + + ' an instance of Error. Received undefined', + name: 'TypeError' + }); + + const expected = [['bar'], [42]]; + + try { + for await (const event of iterable) { + assert.deepStrictEqual(event, expected.shift()); + } + } catch (err) { + thrown = true; + assert.strictEqual(err, _err); + } + assert.strictEqual(thrown, true); + assert.strictEqual(expected.length, 0); + assert.strictEqual(ee.listenerCount('foo'), 0); + assert.strictEqual(ee.listenerCount('error'), 0); +} + +async function run() { + const funcs = [ + basic, + error, + errorDelayed, + throwInLoop, + next, + nextError, + iterableThrow + ]; + + for (const fn of funcs) { + await fn(); + } +} + +run().then(common.mustCall());