diff --git a/lib/events.js b/lib/events.js index a5af5ff9309c18..065ce9f816129c 100644 --- a/lib/events.js +++ b/lib/events.js @@ -616,41 +616,20 @@ function unwrapListeners(arr) { function once(emitter, name) { return new Promise((resolve, reject) => { - if (typeof emitter.addEventListener === 'function') { - // EventTarget does not have `error` event semantics like Node - // EventEmitters, we do not listen to `error` events here. - emitter.addEventListener( - name, - (...args) => { resolve(args); }, - { once: true } - ); - return; - } - - const eventListener = (...args) => { - if (errorListener !== undefined) { + const errorListener = (err) => { + emitter.removeListener(name, resolver); + reject(err); + }; + const resolver = (...args) => { + if (typeof emitter.removeListener === 'function') { emitter.removeListener('error', errorListener); } resolve(args); }; - let errorListener; - - // Adding an error listener is not optional because - // if an error is thrown on an event emitter we cannot - // guarantee that the actual event we are waiting will - // be fired. The result could be a silent way to create - // memory or file descriptor leaks, which is something - // we should avoid. + eventTargetAgnosticAddListener(emitter, name, resolver, { once: true }); if (name !== 'error') { - errorListener = (err) => { - emitter.removeListener(name, eventListener); - reject(err); - }; - - emitter.once('error', errorListener); + addErrorHandlerIfEventEmitter(emitter, errorListener, { once: true }); } - - emitter.once(name, eventListener); }); } @@ -661,6 +640,38 @@ function createIterResult(value, done) { return { value, done }; } +function addErrorHandlerIfEventEmitter(emitter, handler, flags) { + if (typeof emitter.on === 'function') { + eventTargetAgnosticAddListener(emitter, 'error', handler, flags); + } +} + +function eventTargetAgnosticRemoveListener(emitter, name, listener, flags) { + if (typeof emitter.removeListener === 'function') { + emitter.removeListener(name, listener); + } else if (typeof emitter.removeEventListener === 'function') { + emitter.removeEventListener(name, listener, flags); + } else { + throw new ERR_INVALID_ARG_TYPE('emitter', 'EventEmitter', emitter); + } +} + +function eventTargetAgnosticAddListener(emitter, name, listener, flags) { + if (typeof emitter.on === 'function') { + if (flags && flags.once) { + emitter.once(name, listener); + } else { + emitter.on(name, listener); + } + } else if (typeof emitter.addEventListener === 'function') { + // EventTarget does not have `error` event semantics like Node + // EventEmitters, we do not listen to `error` events here. + emitter.addEventListener(name, (arg) => { listener(arg); }, flags); + } else { + throw new ERR_INVALID_ARG_TYPE('emitter', 'EventEmitter', emitter); + } +} + function on(emitter, event) { const unconsumedEvents = []; const unconsumedPromises = []; @@ -697,8 +708,8 @@ function on(emitter, event) { }, return() { - emitter.removeListener(event, eventHandler); - emitter.removeListener('error', errorHandler); + eventTargetAgnosticRemoveListener(emitter, event, eventHandler); + eventTargetAgnosticRemoveListener(emitter, 'error', errorHandler); finished = true; for (const promise of unconsumedPromises) { @@ -714,8 +725,8 @@ function on(emitter, event) { 'Error', err); } error = err; - emitter.removeListener(event, eventHandler); - emitter.removeListener('error', errorHandler); + eventTargetAgnosticRemoveListener(emitter, event, eventHandler); + eventTargetAgnosticRemoveListener(emitter, 'error', errorHandler); }, [SymbolAsyncIterator]() { @@ -723,8 +734,11 @@ function on(emitter, event) { } }, AsyncIteratorPrototype); - emitter.on(event, eventHandler); - emitter.on('error', errorHandler); + eventTargetAgnosticAddListener(emitter, event, eventHandler); + if (event !== 'error') { + addErrorHandlerIfEventEmitter(emitter, errorHandler); + } + return iterator; diff --git a/test/parallel/test-event-on-async-iterator.js b/test/parallel/test-event-on-async-iterator.js index ff5d8cdaf2aea0..9fc77961dca9d4 100644 --- a/test/parallel/test-event-on-async-iterator.js +++ b/test/parallel/test-event-on-async-iterator.js @@ -3,6 +3,11 @@ const common = require('../common'); const assert = require('assert'); const { on, EventEmitter } = require('events'); +const { + EventTarget, + NodeEventTarget, + Event +} = require('internal/event_target'); async function basic() { const ee = new EventEmitter(); @@ -204,6 +209,45 @@ async function iterableThrow() { assert.strictEqual(ee.listenerCount('error'), 0); } +async function eventTarget() { + const et = new EventTarget(); + const tick = () => et.dispatchEvent(new Event('tick')); + const interval = setInterval(tick, 0); + let count = 0; + for await (const [ event ] of on(et, 'tick')) { + count++; + assert.strictEqual(event.type, 'tick'); + if (count >= 5) { + break; + } + } + assert.strictEqual(count, 5); + clearInterval(interval); +} + +async function errorListenerCount() { + const et = new EventEmitter(); + on(et, 'foo'); + assert.strictEqual(et.listenerCount('error'), 1); +} + +async function nodeEventTarget() { + const et = new NodeEventTarget(); + const tick = () => et.dispatchEvent(new Event('tick')); + const interval = setInterval(tick, 0); + let count = 0; + for await (const [ event] of on(et, 'tick')) { + count++; + assert.strictEqual(event.type, 'tick'); + if (count >= 5) { + break; + } + } + assert.strictEqual(count, 5); + clearInterval(interval); +} + + async function run() { const funcs = [ basic, @@ -212,7 +256,10 @@ async function run() { throwInLoop, next, nextError, - iterableThrow + iterableThrow, + eventTarget, + errorListenerCount, + nodeEventTarget ]; for (const fn of funcs) { diff --git a/test/parallel/test-events-once.js b/test/parallel/test-events-once.js index fea143f5877cc7..658a9964be73e1 100644 --- a/test/parallel/test-events-once.js +++ b/test/parallel/test-events-once.js @@ -1,55 +1,10 @@ 'use strict'; +// Flags: --expose-internals const common = require('../common'); const { once, EventEmitter } = require('events'); -const { strictEqual, deepStrictEqual } = require('assert'); - -class EventTargetMock { - constructor() { - this.events = {}; - } - - addEventListener = common.mustCall(function(name, listener, options) { - if (!(name in this.events)) { - this.events[name] = { listeners: [], options }; - } - this.events[name].listeners.push(listener); - }); - - removeEventListener = common.mustCall(function(name, callback) { - if (!(name in this.events)) { - return; - } - const event = this.events[name]; - const stack = event.listeners; - - for (let i = 0, l = stack.length; i < l; i++) { - if (stack[i] === callback) { - stack.splice(i, 1); - if (stack.length === 0) { - Reflect.deleteProperty(this.events, name); - } - return; - } - } - }); - - dispatchEvent = function(name, ...arg) { - if (!(name in this.events)) { - return true; - } - const event = this.events[name]; - const stack = event.listeners.slice(); - - for (let i = 0, l = stack.length; i < l; i++) { - stack[i].apply(this, arg); - if (event.options.once) { - this.removeEventListener(name, stack[i]); - } - } - return !name.defaultPrevented; - }; -} +const { strictEqual, deepStrictEqual, fail } = require('assert'); +const { EventTarget, Event } = require('internal/event_target'); async function onceAnEvent() { const ee = new EventEmitter(); @@ -104,8 +59,6 @@ async function stopListeningAfterCatchingError() { ee.emit('myevent', 42, 24); }); - process.on('multipleResolves', common.mustNotCall()); - try { await once(ee, 'myevent'); } catch (_e) { @@ -125,47 +78,42 @@ async function onceError() { ee.emit('error', expected); }); - const [err] = await once(ee, 'error'); + const promise = once(ee, 'error'); + strictEqual(ee.listenerCount('error'), 1); + const [ err ] = await promise; strictEqual(err, expected); strictEqual(ee.listenerCount('error'), 0); strictEqual(ee.listenerCount('myevent'), 0); } async function onceWithEventTarget() { - const et = new EventTargetMock(); - + const et = new EventTarget(); + const event = new Event('myevent'); process.nextTick(() => { - et.dispatchEvent('myevent', 42); + et.dispatchEvent(event); }); const [ value ] = await once(et, 'myevent'); - strictEqual(value, 42); - strictEqual(Reflect.has(et.events, 'myevent'), false); -} - -async function onceWithEventTargetTwoArgs() { - const et = new EventTargetMock(); - - process.nextTick(() => { - et.dispatchEvent('myevent', 42, 24); - }); - - const value = await once(et, 'myevent'); - deepStrictEqual(value, [42, 24]); + strictEqual(value, event); } async function onceWithEventTargetError() { - const et = new EventTargetMock(); - - const expected = new Error('kaboom'); + const et = new EventTarget(); + const error = new Event('error'); process.nextTick(() => { - et.dispatchEvent('error', expected); + et.dispatchEvent(error); }); - const [err] = await once(et, 'error'); - strictEqual(err, expected); - strictEqual(Reflect.has(et.events, 'error'), false); + const [ err ] = await once(et, 'error'); + strictEqual(err, error); } +async function prioritizesEventEmitter() { + const ee = new EventEmitter(); + ee.addEventListener = fail; + ee.removeAllListeners = fail; + process.nextTick(() => ee.emit('foo')); + await once(ee, 'foo'); +} Promise.all([ onceAnEvent(), onceAnEventWithTwoArgs(), @@ -173,6 +121,6 @@ Promise.all([ stopListeningAfterCatchingError(), onceError(), onceWithEventTarget(), - onceWithEventTargetTwoArgs(), onceWithEventTargetError(), + prioritizesEventEmitter(), ]).then(common.mustCall());